Technologieaustausch

[Python] Einfaches Python Flask- und gRPC-Projekt

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Python Flask- und gRPC-Beispielprojekte

In diesem Artikel wird erläutert, wie Sie mit Flask und gRPC in Python eine einfache Beispielanwendung erstellen und verwenden requests Bibliothek zum Testen.

Umgebungseinstellungen

Stellen Sie zunächst sicher, dass Python installiert ist. Erstellen Sie dann eine virtuelle Umgebung, um Ihre Abhängigkeiten zu verwalten.

python -m venv myenv
source myenv/bin/activate  # Windows 使用 `myenvScriptsactivate`
  • 1
  • 2

Installieren Sie die erforderlichen Pakete:

pip install Flask grpcio grpcio-tools requests
  • 1

Definieren Sie den gRPC-Dienst

Erstellen .proto Datei zum Definieren des gRPC-Dienstes.Dateinamen speichernservice.proto

syntax = "proto3";

package demo;

service DemoService {
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Generieren Sie Python-Code aus Proto-Dateien

verwenden grpc_tools Python-Code generieren:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
  • 1

Dadurch werden zwei Dateien generiert:service_pb2.py Undservice_pb2_grpc.py

Implementieren Sie den gRPC-Server

Erstellen Sie eine Datei mit dem Namen grpc_server.py dokumentieren:

from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc

class DemoService(service_pb2_grpc.DemoServiceServicer):
    def SayHello(self, request, context):
        return service_pb2.HelloResponse(message=f'Hello, {request.name}!')

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Implementieren Sie den Flask-Server

Erstellen Sie eine Datei mit dem Namen flask_server.py dokumentieren:

from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc

app = Flask(__name__)

def get_grpc_client():
    channel = grpc.insecure_channel('localhost:50051')
    stub = service_pb2_grpc.DemoServiceStub(channel)
    return stub

@app.route('/hello', methods=['POST'])
def say_hello():
    data = request.json
    name = data.get('name')
    stub = get_grpc_client()
    response = stub.SayHello(service_pb2.HelloRequest(name=name))
    return jsonify({'message': response.message})

if __name__ == '__main__':
    app.run(port=5000, debug=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Führen Sie den Server aus

Öffnen Sie zwei Terminalfenster oder Registerkarten.

  1. Starten Sie den gRPC-Server im ersten Terminal:

    python grpc_server.py
    
    • 1
  2. Starten Sie den Flask-Server in einem zweiten Terminal:

    python flask_server.py
    
    • 1

verwenden requests Tests durchführen

Erstellen Sie eine Datei mit dem Namen test_flask_server.py Datei und fügen Sie den folgenden Code hinzu:

import requests

def test_say_hello():
    url = 'http://localhost:5000/hello'
    data = {'name': 'World'}
    response = requests.post(url, json=data)
    
    if response.status_code == 200:
        print('Success!')
        print('Response JSON:', response.json())
    else:
        print('Failed!')
        print('Status Code:', response.status_code)
        print('Response Text:', response.text)

if __name__ == '__main__':
    test_say_hello()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Führen Sie Tests durch

  1. stellen Sie sicher grpc_server.py Undflask_server.py läuft.

  2. Führen Sie es in einem anderen Terminal aus test_flask_server.py

    python test_flask_server.py
    
    • 1

Sie sollten eine Ausgabe wie diese sehen:

Success!
Response JSON: {'message': 'Hello, World!'}
  • 1
  • 2

Screenshot ausführen

Fügen Sie hier eine Bildbeschreibung ein

Dieses Testskript sendet eine Nachricht mit name für"World" POST-Anfrage des JSON-Objekts anhttp://localhost:5000/hello , und drucken Sie dann die vom Server zurückgegebene Antwort aus. Wenn der Server ordnungsgemäß funktioniert, wird eine Erfolgsmeldung angezeigt und die JSON-Daten werden zurückgegeben.