informasi kontak saya
Surat[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Artikel ini akan memperkenalkan cara membuat contoh aplikasi sederhana menggunakan Flask dan gRPC dengan Python dan cara menggunakannya requests
perpustakaan untuk pengujian.
Pertama, pastikan Anda telah menginstal Python. Kemudian, buat lingkungan virtual untuk mengelola dependensi Anda.
python -m venv myenv
source myenv/bin/activate # Windows 使用 `myenvScriptsactivate`
Instal paket yang diperlukan:
pip install Flask grpcio grpcio-tools requests
Membuat .proto
file untuk menentukan layanan gRPC.Simpan nama fileservice.proto
:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
menggunakan grpc_tools
Hasilkan kode Python:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
Ini akan menghasilkan dua file:service_pb2.py
Danservice_pb2_grpc.py
。
Buat file bernama grpc_server.py
dokumen:
from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc
class DemoService(service_pb2_grpc.DemoServiceServicer):
def SayHello(self, request, context):
return service_pb2.HelloResponse(message=f'Hello, {request.name}!')
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
Buat file bernama flask_server.py
dokumen:
from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc
app = Flask(__name__)
def get_grpc_client():
channel = grpc.insecure_channel('localhost:50051')
stub = service_pb2_grpc.DemoServiceStub(channel)
return stub
@app.route('/hello', methods=['POST'])
def say_hello():
data = request.json
name = data.get('name')
stub = get_grpc_client()
response = stub.SayHello(service_pb2.HelloRequest(name=name))
return jsonify({'message': response.message})
if __name__ == '__main__':
app.run(port=5000, debug=True)
Buka dua jendela atau tab terminal.
Mulai server gRPC di terminal pertama:
python grpc_server.py
Mulai server Flask di terminal kedua:
python flask_server.py
requests
melakukan pengujianBuat file bernama test_flask_server.py
file dan tambahkan kode berikut:
import requests
def test_say_hello():
url = 'http://localhost:5000/hello'
data = {'name': 'World'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success!')
print('Response JSON:', response.json())
else:
print('Failed!')
print('Status Code:', response.status_code)
print('Response Text:', response.text)
if __name__ == '__main__':
test_say_hello()
memastikan grpc_server.py
Danflask_server.py
berlari.
Jalankan di terminal lain test_flask_server.py
:
python test_flask_server.py
Anda akan melihat keluaran seperti ini:
Success!
Response JSON: {'message': 'Hello, World!'}
Skrip pengujian ini mengirimkan pesan yang berisi name
untuk"World"
POST permintaan objek JSON kehttp://localhost:5000/hello
, lalu cetak respons yang dikembalikan oleh server. Jika server berfungsi dengan baik, Anda akan melihat pesan sukses dan data JSON dikembalikan.