Skip to content

Instantly share code, notes, and snippets.

@alksndrglk
Created December 20, 2022 13:55
Show Gist options
  • Select an option

  • Save alksndrglk/0ab79e35eaf8680b296050bcb45314a3 to your computer and use it in GitHub Desktop.

Select an option

Save alksndrglk/0ab79e35eaf8680b296050bcb45314a3 to your computer and use it in GitHub Desktop.
#----client
import grpc
import sbercorus_mq_pb2
import sbercorus_mq_pb2_grpc
import asyncio
async def get_client_stream_requests():
while True:
hello_request = sbercorus_mq_pb2.Message(
correlation_id="3e066b6b-3e98-4d66-938f-f8d06ec03000",
service_key="CLIENT_2",
service_id=123456789,
recipient="Pfr",
docflow_uid="15227e94-dab4-44b0-a147-4d8d895690fc",
container_name="\u041F\u0424\u0420_\u0422\u0415\u0421\u0422",
container_file="",
container_bytes=" ",
created="2022-10-21T06:24:21.5421006+06:00",
sent="2022-10-21T08:24:21.5439226+06:00",
confirmed="2022-10-21T10:24:21.5439624+06:00",
attributes={},
)
await asyncio.sleep(2)
yield hello_request
async def run():
async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = sbercorus_mq_pb2_grpc.ProxyMQStub(channel)
responses = stub.SendMessage(get_client_stream_requests())
async for response in responses:
print("InteractingHello Response Received: ")
print(response)
if __name__ == "__main__":
asyncio.run(run())
#----server
import grpc
import sbercorus_mq_pb2
import sbercorus_mq_pb2_grpc
class ProxyMQServicer(sbercorus_mq_pb2_grpc.ProxyMQServicer):
def __init__(self) -> None:
super().__init__()
self.worker : asyncio.Task = None
async def SendMessage(self, request_iterator, context):
if not self.worker and context:
self.worker = asyncio.create_task(fake_rabbit(self, context))
async for request in request_iterator:
print("SendMessage Request Made:")
print(request)
hello_reply = sbercorus_mq_pb2.Message(
correlation_id="3e066b6b-3e98-4d66-938f-f8d06ec03000",
service_key=request.service_key,
service_id=request.service_id + 1,
recipient="Pfr",
docflow_uid="15227e94-dab4-44b0-a147-4d8d895690fc",
container_name="\u041F\u0424\u0420_\u0422\u0415\u0421\u0422",
container_file="",
container_bytes=" ",
created="2022-10-21T06:24:21.5421006+06:00",
sent="2022-10-21T08:24:21.5439226+06:00",
confirmed="2022-10-21T10:24:21.5439624+06:00",
attributes={"PING": "PONG"},
)
await context.write(hello_reply)
async def fake_rabbit_stream():
while True:
msg = sbercorus_mq_pb2.Message(
correlation_id="3e066b6b-3e98-4d66-938f-f8d06ec03000",
service_key="FAKE_RABBBIT",
service_id=123456789,
recipient="Pfr",
docflow_uid="15227e94-dab4-44b0-a147-4d8d895690fc",
container_name="\u041F\u0424\u0420_\u0422\u0415\u0421\u0422",
container_file="",
container_bytes=" ",
created="2022-10-21T06:24:21.5421006+06:00",
sent="2022-10-21T08:24:21.5439226+06:00",
confirmed="2022-10-21T10:24:21.5439624+06:00",
attributes={},
)
await asyncio.sleep(5)
yield msg
async def fake_rabbit(servicer, context):
while True:
fake_rabbit = fake_rabbit_stream()
await servicer.SendMessage(fake_rabbit,context)
async def serve():
server = grpc.aio.server()
servicer = ProxyMQServicer()
sbercorus_mq_pb2_grpc.add_ProxyMQServicer_to_server(servicer, server)
server.add_insecure_port("localhost:50051")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
#----proto
syntax = "proto3";
package proxymq;
service ProxyMQ{
rpc SendMessage(stream Message) returns (stream Message);
}
message Message{
string correlation_id = 1;
string service_key = 2;
int32 service_id = 3;
optional bool sphere_out = 4;
string recipient = 5;
string docflow_uid = 6;
string container_name = 7;
optional int32 container_type = 8;
string container_file = 9;
optional string container_bytes = 10;
string created = 11;
string sent = 12;
string confirmed = 13;
map<string,string> attributes = 14;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment