2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
In diesem Artikel wird erläutert, wie Sie mit Flask und gRPC in Python eine einfache Beispielanwendung erstellen und verwenden requests
Bibliothek zum Testen.
Stellen Sie zunächst sicher, dass Python installiert ist. Erstellen Sie dann eine virtuelle Umgebung, um Ihre Abhängigkeiten zu verwalten.
python -m venv myenv
source myenv/bin/activate # Windows 使用 `myenvScriptsactivate`
Installieren Sie die erforderlichen Pakete:
pip install Flask grpcio grpcio-tools requests
Erstellen .proto
Datei zum Definieren des gRPC-Dienstes.Dateinamen speichernservice.proto
:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
verwenden grpc_tools
Python-Code generieren:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
Dadurch werden zwei Dateien generiert:service_pb2.py
Undservice_pb2_grpc.py
。
Erstellen Sie eine Datei mit dem Namen grpc_server.py
dokumentieren:
from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc
class DemoService(service_pb2_grpc.DemoServiceServicer):
def SayHello(self, request, context):
return service_pb2.HelloResponse(message=f'Hello, {request.name}!')
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
Erstellen Sie eine Datei mit dem Namen flask_server.py
dokumentieren:
from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc
app = Flask(__name__)
def get_grpc_client():
channel = grpc.insecure_channel('localhost:50051')
stub = service_pb2_grpc.DemoServiceStub(channel)
return stub
@app.route('/hello', methods=['POST'])
def say_hello():
data = request.json
name = data.get('name')
stub = get_grpc_client()
response = stub.SayHello(service_pb2.HelloRequest(name=name))
return jsonify({'message': response.message})
if __name__ == '__main__':
app.run(port=5000, debug=True)
Öffnen Sie zwei Terminalfenster oder Registerkarten.
Starten Sie den gRPC-Server im ersten Terminal:
python grpc_server.py
Starten Sie den Flask-Server in einem zweiten Terminal:
python flask_server.py
requests
Tests durchführenErstellen Sie eine Datei mit dem Namen test_flask_server.py
Datei und fügen Sie den folgenden Code hinzu:
import requests
def test_say_hello():
url = 'http://localhost:5000/hello'
data = {'name': 'World'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success!')
print('Response JSON:', response.json())
else:
print('Failed!')
print('Status Code:', response.status_code)
print('Response Text:', response.text)
if __name__ == '__main__':
test_say_hello()
stellen Sie sicher grpc_server.py
Undflask_server.py
läuft.
Führen Sie es in einem anderen Terminal aus test_flask_server.py
:
python test_flask_server.py
Sie sollten eine Ausgabe wie diese sehen:
Success!
Response JSON: {'message': 'Hello, World!'}
Dieses Testskript sendet eine Nachricht mit name
für"World"
POST-Anfrage des JSON-Objekts anhttp://localhost:5000/hello
, und drucken Sie dann die vom Server zurückgegebene Antwort aus. Wenn der Server ordnungsgemäß funktioniert, wird eine Erfolgsmeldung angezeigt und die JSON-Daten werden zurückgegeben.