τα στοιχεία επικοινωνίας μου
Ταχυδρομείο[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Αυτό το άρθρο θα εισαγάγει τον τρόπο δημιουργίας και χρήσης ενός απλού δείγματος εφαρμογής χρησιμοποιώντας το Flask και το gRPC στην Python requests
βιβλιοθήκη για δοκιμή.
Πρώτα, βεβαιωθείτε ότι έχετε εγκαταστήσει την Python. Στη συνέχεια, δημιουργήστε ένα εικονικό περιβάλλον για να διαχειριστείτε τις εξαρτήσεις σας.
python -m venv myenv
source myenv/bin/activate # Windows 使用 `myenvScriptsactivate`
Εγκαταστήστε τα απαραίτητα πακέτα:
pip install Flask grpcio grpcio-tools requests
Δημιουργώ .proto
αρχείο για να ορίσετε την υπηρεσία gRPC.Αποθήκευση ονόματος αρχείουservice.proto
:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
χρήση grpc_tools
Δημιουργία κώδικα Python:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
Αυτό θα δημιουργήσει δύο αρχεία:service_pb2.py
καιservice_pb2_grpc.py
。
Δημιουργήστε ένα αρχείο που ονομάζεται grpc_server.py
έγγραφο:
from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc
class DemoService(service_pb2_grpc.DemoServiceServicer):
def SayHello(self, request, context):
return service_pb2.HelloResponse(message=f'Hello, {request.name}!')
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
Δημιουργήστε ένα αρχείο που ονομάζεται flask_server.py
έγγραφο:
from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc
app = Flask(__name__)
def get_grpc_client():
channel = grpc.insecure_channel('localhost:50051')
stub = service_pb2_grpc.DemoServiceStub(channel)
return stub
@app.route('/hello', methods=['POST'])
def say_hello():
data = request.json
name = data.get('name')
stub = get_grpc_client()
response = stub.SayHello(service_pb2.HelloRequest(name=name))
return jsonify({'message': response.message})
if __name__ == '__main__':
app.run(port=5000, debug=True)
Ανοίξτε δύο παράθυρα ή καρτέλες τερματικού.
Ξεκινήστε τον διακομιστή gRPC στο πρώτο τερματικό:
python grpc_server.py
Ξεκινήστε τον διακομιστή Flask σε ένα δεύτερο τερματικό:
python flask_server.py
requests
πραγματοποιήσει δοκιμήΔημιουργήστε ένα αρχείο που ονομάζεται test_flask_server.py
αρχείο και προσθέστε τον ακόλουθο κώδικα:
import requests
def test_say_hello():
url = 'http://localhost:5000/hello'
data = {'name': 'World'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success!')
print('Response JSON:', response.json())
else:
print('Failed!')
print('Status Code:', response.status_code)
print('Response Text:', response.text)
if __name__ == '__main__':
test_say_hello()
συγουρεύομαι grpc_server.py
καιflask_server.py
τρέξιμο.
Εκτελέστε σε άλλο τερματικό test_flask_server.py
:
python test_flask_server.py
Θα πρέπει να δείτε την έξοδο ως εξής:
Success!
Response JSON: {'message': 'Hello, World!'}
Αυτό το δοκιμαστικό σενάριο στέλνει ένα μήνυμα που περιέχει name
Για"World"
Αίτημα POST του αντικειμένου JSONhttp://localhost:5000/hello
, και στη συνέχεια εκτυπώστε την απόκριση που επιστρέφεται από τον διακομιστή. Εάν ο διακομιστής λειτουργεί σωστά, θα δείτε ένα μήνυμα επιτυχίας και θα επιστραφούν τα δεδομένα JSON.