내 연락처 정보
우편메소피아@프로톤메일.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
이 기사에서는 Python에서 Flask와 gRPC를 사용하여 간단한 샘플 애플리케이션을 만들고 사용하는 방법을 소개합니다. requests
테스트용 라이브러리.
먼저 Python이 설치되어 있는지 확인하십시오. 그런 다음 종속성을 관리할 가상 환경을 만듭니다.
python -m venv myenv
source myenv/bin/activate # Windows 使用 `myenvScriptsactivate`
필요한 패키지를 설치합니다:
pip install Flask grpcio grpcio-tools requests
만들다 .proto
gRPC 서비스를 정의하는 파일입니다.파일 이름 저장service.proto
:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
사용 grpc_tools
Python 코드 생성:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
이렇게 하면 두 개의 파일이 생성됩니다.service_pb2.py
그리고service_pb2_grpc.py
。
라는 파일을 만듭니다. grpc_server.py
문서:
from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc
class DemoService(service_pb2_grpc.DemoServiceServicer):
def SayHello(self, request, context):
return service_pb2.HelloResponse(message=f'Hello, {request.name}!')
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
라는 파일을 만듭니다. flask_server.py
문서:
from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc
app = Flask(__name__)
def get_grpc_client():
channel = grpc.insecure_channel('localhost:50051')
stub = service_pb2_grpc.DemoServiceStub(channel)
return stub
@app.route('/hello', methods=['POST'])
def say_hello():
data = request.json
name = data.get('name')
stub = get_grpc_client()
response = stub.SayHello(service_pb2.HelloRequest(name=name))
return jsonify({'message': response.message})
if __name__ == '__main__':
app.run(port=5000, debug=True)
두 개의 터미널 창이나 탭을 엽니다.
첫 번째 터미널에서 gRPC 서버를 시작합니다.
python grpc_server.py
두 번째 터미널에서 Flask 서버를 시작합니다.
python flask_server.py
requests
테스트를 수행하다라는 파일을 만듭니다. test_flask_server.py
파일을 작성하고 다음 코드를 추가합니다.
import requests
def test_say_hello():
url = 'http://localhost:5000/hello'
data = {'name': 'World'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success!')
print('Response JSON:', response.json())
else:
print('Failed!')
print('Status Code:', response.status_code)
print('Response Text:', response.text)
if __name__ == '__main__':
test_say_hello()
확실하게 하다 grpc_server.py
그리고flask_server.py
달리기.
다른 터미널에서 실행 test_flask_server.py
:
python test_flask_server.py
다음과 같은 출력이 표시됩니다.
Success!
Response JSON: {'message': 'Hello, World!'}
이 테스트 스크립트는 다음을 포함하는 메시지를 보냅니다. name
~을 위한"World"
JSON 객체의 POST 요청http://localhost:5000/hello
를 클릭한 다음 서버에서 반환한 응답을 인쇄합니다. 서버가 제대로 작동하면 성공 메시지와 JSON 데이터가 반환됩니다.