Κοινή χρήση τεχνολογίας

[Python] Python Flask και gRPC απλό έργο

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Δείγματα έργων Python Flask και gRPC

Αυτό το άρθρο θα εισαγάγει τον τρόπο δημιουργίας και χρήσης ενός απλού δείγματος εφαρμογής χρησιμοποιώντας το Flask και το gRPC στην Python requests βιβλιοθήκη για δοκιμή.

Ρυθμίσεις περιβάλλοντος

Πρώτα, βεβαιωθείτε ότι έχετε εγκαταστήσει την Python. Στη συνέχεια, δημιουργήστε ένα εικονικό περιβάλλον για να διαχειριστείτε τις εξαρτήσεις σας.

python -m venv myenv
source myenv/bin/activate  # Windows 使用 `myenvScriptsactivate`
  • 1
  • 2

Εγκαταστήστε τα απαραίτητα πακέτα:

pip install Flask grpcio grpcio-tools requests
  • 1

Ορίστε την υπηρεσία gRPC

Δημιουργώ .proto αρχείο για να ορίσετε την υπηρεσία gRPC.Αποθήκευση ονόματος αρχείουservice.proto

syntax = "proto3";

package demo;

service DemoService {
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Δημιουργήστε κώδικα Python από αρχεία Proto

χρήση grpc_tools Δημιουργία κώδικα Python:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
  • 1

Αυτό θα δημιουργήσει δύο αρχεία:service_pb2.py καιservice_pb2_grpc.py

Εφαρμογή διακομιστή gRPC

Δημιουργήστε ένα αρχείο που ονομάζεται grpc_server.py έγγραφο:

from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc

class DemoService(service_pb2_grpc.DemoServiceServicer):
    def SayHello(self, request, context):
        return service_pb2.HelloResponse(message=f'Hello, {request.name}!')

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Εφαρμογή διακομιστή Flask

Δημιουργήστε ένα αρχείο που ονομάζεται flask_server.py έγγραφο:

from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc

app = Flask(__name__)

def get_grpc_client():
    channel = grpc.insecure_channel('localhost:50051')
    stub = service_pb2_grpc.DemoServiceStub(channel)
    return stub

@app.route('/hello', methods=['POST'])
def say_hello():
    data = request.json
    name = data.get('name')
    stub = get_grpc_client()
    response = stub.SayHello(service_pb2.HelloRequest(name=name))
    return jsonify({'message': response.message})

if __name__ == '__main__':
    app.run(port=5000, debug=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Εκτελέστε τον διακομιστή

Ανοίξτε δύο παράθυρα ή καρτέλες τερματικού.

  1. Ξεκινήστε τον διακομιστή gRPC στο πρώτο τερματικό:

    python grpc_server.py
    
    • 1
  2. Ξεκινήστε τον διακομιστή Flask σε ένα δεύτερο τερματικό:

    python flask_server.py
    
    • 1

χρήση requests πραγματοποιήσει δοκιμή

Δημιουργήστε ένα αρχείο που ονομάζεται test_flask_server.py αρχείο και προσθέστε τον ακόλουθο κώδικα:

import requests

def test_say_hello():
    url = 'http://localhost:5000/hello'
    data = {'name': 'World'}
    response = requests.post(url, json=data)
    
    if response.status_code == 200:
        print('Success!')
        print('Response JSON:', response.json())
    else:
        print('Failed!')
        print('Status Code:', response.status_code)
        print('Response Text:', response.text)

if __name__ == '__main__':
    test_say_hello()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Εκτελέστε δοκιμές

  1. συγουρεύομαι grpc_server.py καιflask_server.py τρέξιμο.

  2. Εκτελέστε σε άλλο τερματικό test_flask_server.py

    python test_flask_server.py
    
    • 1

Θα πρέπει να δείτε την έξοδο ως εξής:

Success!
Response JSON: {'message': 'Hello, World!'}
  • 1
  • 2

Εκτέλεση στιγμιότυπου οθόνης

Εισαγάγετε την περιγραφή της εικόνας εδώ

Αυτό το δοκιμαστικό σενάριο στέλνει ένα μήνυμα που περιέχει name Για"World" Αίτημα POST του αντικειμένου JSONhttp://localhost:5000/hello , και στη συνέχεια εκτυπώστε την απόκριση που επιστρέφεται από τον διακομιστή. Εάν ο διακομιστής λειτουργεί σωστά, θα δείτε ένα μήνυμα επιτυχίας και θα επιστραφούν τα δεδομένα JSON.