3

Using Python gRPC, I would like to be able to cancel a long-running unary-stream call from the client side, when a threading.Event is set.

def application(stub: StreamsStub, event: threading.Event):
    stream = stub.Application(ApplicationStreamRequest())
    try:
        for resp in stream:
            print(resp)
    except grpc.RpcError as e:
        print(e)

For the time being I am cancelling the stream using the channel.close() method, but of course this closes all connections rather than just this stream.

Could someone suggest how I can use the event to cancel the stream iterator? Thanks

2 Answers 2

2

Below is some code for a gRPC UnaryStream call. The server sends an unending number of replies, leaving the client to decide when to stop receiving them.

Instead of using a counter, you can have a thread go off and do some work, and set an event that is checked before calling cancel() instead of checking the counter.

Note: using Python 2.7

Protofile:

syntax = "proto3";

package my_package;

service HeartBeat {
    rpc Beats(Counter) returns (stream Counter) {}
}

message Counter {
    int32 counter = 1;
}

Client:

from __future__ import print_function

import grpc
import heartbeat_pb2
import heartbeat_pb2_grpc


def get_beats(stub, channel):
    try:
        result_iterator = stub.Beats(heartbeat_pb2.Counter(counter=i))
        for result in result_iterator:
            print("Count: {}".format(result.counter))
            if result.counter >= 3: # We only wants 3 'beats'
                result_iterator.cancel()
    except grpc.RpcError as rpc_error:
        if rpc_error.code() == grpc.StatusCode.CANCELLED:
            pass # Otherwise, a traceback is printed


def run():
    with grpc.insecure_channel('localhost:9999') as channel:
        stub = heartbeat_pb2_grpc.HeartBeatStub(channel)
        get_beats(stub, channel)


if __name__ == '__main__':
    run()

Server:

from concurrent import futures

import grpc
from proto_generated import heartbeat_pb2
from proto_generated import heartbeat_pb2_grpc
import time


class HeartBeatServicer(heartbeat_pb2_grpc.HeartBeatServicer):
    pass


    def Beats(self, request, context):
        # Not required, only to show sending the server a message
        print("Beats: {}".format(request.counter))

        def response_message():
            i = 0
            while context.is_active():
                print("Sending {}".format(i))
                response = heartbeat_pb2.Counter(counter=i)
                i += 1
                time.sleep(1)  # Simulate doing work
                yield response

        return response_message()


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    heartbeat_pb2_grpc.add_HeartBeatServicer_to_server(
        HeartBeatServicer(), server)
    server.add_insecure_port('[::]:9999')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    serve()
Sign up to request clarification or add additional context in comments.

1 Comment

Special thanks to Nordine Lotfi (user 12349101) and to MisterMiyagi (user 5349916) for their help in the SO Python chat for their help.
1

The _Rendezvous object returned by a rpc call implements grpc.RpcError, grpc.Future, and grpc.Call, therefore cancelling the stream is as simple as calling stream.cancel (from grpc.Future interface)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.