2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
अस्मिन् लेखे पायथन् मध्ये Flask तथा gRPC इत्येतयोः उपयोगेन सरलं नमूना अनुप्रयोगं कथं निर्मातव्यं तथा च कथं उपयोगः करणीयः इति परिचयः भविष्यति requests
परीक्षणार्थं पुस्तकालयः।
प्रथमं भवतः Python संस्थापितम् अस्ति इति सुनिश्चितं कुर्वन्तु । ततः, स्वस्य आश्रयाणां प्रबन्धनार्थं आभासीवातावरणं रचयन्तु ।
python -m venv myenv
source myenv/bin/activate # Windows 使用 `myenvScriptsactivate`
आवश्यकसंकुलं संस्थापयन्तु : १.
pip install Flask grpcio grpcio-tools requests
निर्मियताम् .proto
gRPC सेवां परिभाषितुं सञ्चिका ।सञ्चिकानाम रक्षतुservice.proto
:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
उपयुञ्जताम् grpc_tools
पायथन् कोडं जनयन्तु : १.
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
एतेन द्वौ सञ्चिकाः उत्पद्यन्ते :service_pb2.py
तथाservice_pb2_grpc.py
。
इति सञ्चिकां रचयन्तु grpc_server.py
प्रलेख:
from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc
class DemoService(service_pb2_grpc.DemoServiceServicer):
def SayHello(self, request, context):
return service_pb2.HelloResponse(message=f'Hello, {request.name}!')
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
इति सञ्चिकां रचयन्तु flask_server.py
प्रलेख:
from flask import Flask, request, jsonify
import grpc
import service_pb2
import service_pb2_grpc
app = Flask(__name__)
def get_grpc_client():
channel = grpc.insecure_channel('localhost:50051')
stub = service_pb2_grpc.DemoServiceStub(channel)
return stub
@app.route('/hello', methods=['POST'])
def say_hello():
data = request.json
name = data.get('name')
stub = get_grpc_client()
response = stub.SayHello(service_pb2.HelloRequest(name=name))
return jsonify({'message': response.message})
if __name__ == '__main__':
app.run(port=5000, debug=True)
द्वौ टर्मिनल् विण्डो वा ट्याब् वा उद्घाटयन्तु ।
प्रथमे टर्मिनल् मध्ये gRPC सर्वरं आरभत:
python grpc_server.py
द्वितीय टर्मिनल् मध्ये Flask सर्वरं आरभत:
python flask_server.py
requests
परीक्षणं कुर्वन्तिइति सञ्चिकां रचयन्तु test_flask_server.py
सञ्चिकां कृत्वा निम्नलिखितसङ्केतं योजयन्तु:
import requests
def test_say_hello():
url = 'http://localhost:5000/hello'
data = {'name': 'World'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success!')
print('Response JSON:', response.json())
else:
print('Failed!')
print('Status Code:', response.status_code)
print('Response Text:', response.text)
if __name__ == '__main__':
test_say_hello()
सुनिश्चितं कुरुत grpc_server.py
तथाflask_server.py
चलति।
अन्यस्मिन् टर्मिनले चालयन्तु test_flask_server.py
:
python test_flask_server.py
भवद्भिः एतादृशं आउटपुट् द्रष्टव्यम् :
Success!
Response JSON: {'message': 'Hello, World!'}
एषा परीक्षणलिपिः युक्तं सन्देशं प्रेषयति name
कृते"World"
JSON object to इत्यस्य POST अनुरोधःhttp://localhost:5000/hello
, ततः सर्वरेण प्रत्यागतं प्रतिक्रियां मुद्रयन्तु । यदि सर्वरः सम्यक् कार्यं करोति तर्हि भवान् सफलतासन्देशं पश्यति तथा च JSON दत्तांशः प्रत्यागच्छति ।