0

I am using generic WebSocket client (System.Net.WebSockets) to connect to Azure Web PubSub service and communicate between two parties. I referred this official document. I was able to communicate using json but with protobuf I don't receive any data. Below is the sample code I am using:

EDIT: client is receiving some message (after changing class type to 'DownstreamMessage') but it says: disconnectedMessage": { "reason": "Invalid payload. Invalid value : '': Invalid event name. Valid event name should be word in between 1 and 128 characters long. I don't understand whats happening

using Google.Protobuf;

// Create a joining message
UpstreamMessage joiningMessage = new()
{
    JoinGroupMessage = new UpstreamMessage.Types.JoinGroupMessage()
    {
        Group = Group,
    }
};
var joinData = joiningMessage.ToByteArray();

// Initialize the WebSocket
ClientWebSocket webSocket = new();
webSocket.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
await webSocket.ConnectAsync(new Uri(connStr), CancellationToken.None);

// Set up a continuous message receiver
_ = Task.Run(async () =>
{
    while (webSocket.State == WebSocketState.Open)
    {
        var buffer = new byte[8192];
        var result = await webSocket.ReceiveAsync(
                                    new ArraySegment<byte>(buffer),
                                    CancellationToken.None);

        if (result.MessageType == WebSocketMessageType.Binary)
        {
            // Create a properly sized array containing only the actual message
            var messageBytes = new byte[result.Count];
            Array.Copy(buffer, 0, messageBytes, 0, result.Count);

            try
            {
                var msg = DownstreamMessage.Parser.ParseFrom(messageBytes);
                Console.WriteLine($"Received message: {msg}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error parsing message: {ex.Message}");
            }
        }
        else if (result.MessageType == WebSocketMessageType.Close)
        {
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
            break;
        }
    }
});

// Wait for connection establishment
await Task.Delay(2000);

// Send join request
await webSocket.SendAsync(joinData, WebSocketMessageType.Binary, true, CancellationToken.None);
Console.WriteLine("Join group message sent");

// Wait for join to be processed
await Task.Delay(1000);

// Create and send data message
UpstreamMessage dataMessage = new()
{
    // Set the EventMessage with proper target group
    EventMessage = new UpstreamMessage.Types.EventMessage()
    {
        Data = new MessageData()
        {
            TextData = "Hello from protobuf dheeraj"
        }
    }
};

await webSocket.SendAsync(dataMessage.ToByteArray(), WebSocketMessageType.Binary, true, CancellationToken.None);
Console.WriteLine("Data message sent");
1
  • The error you encountered—Invalid event name—suggests that the Protobuf message structure was incorrect. I ensured that EventMessage properly sets the Data field. Commented Feb 27 at 10:45

1 Answer 1

1

The error you encountered, Invalid event name, indicates that the Protobuf message structure was incorrect. I have ensured that EventMessage properly sets the Data field.

Use BinaryData.FromBytes(message.ToByteArray()) to serialize messages like below.

await serviceClient.SendToGroupAsync(Group, BinaryData.FromString(message), WebPubSubDataType.Binary);

Below is the sample code for Azure PubSub with protobuf:

    private const string Group = "protobuf-client-group";
    private const string Uri = "wss://"; 
    static async Task Main()
    {
        var serviceClient = new WebPubSubClient(new Uri(Uri), new WebPubSubClientOptions
        {
            Protocol = new WebPubSubProtobufProtocol()
        });

        serviceClient.Connected += arg =>
        {
            Console.WriteLine($"Connected with connection id: {arg.ConnectionId}");
            return Task.CompletedTask;
        };

        serviceClient.Disconnected += arg =>
        {
            Console.WriteLine($"Disconnected from connection id: {arg.ConnectionId}");
            return Task.CompletedTask;
        };

        serviceClient.GroupMessageReceived += arg =>
        {
            Console.WriteLine($"Received Protobuf message: {arg.Message.Data}");
            return Task.CompletedTask;
        };
        await serviceClient.StartAsync();
        await serviceClient.JoinGroupAsync(Group);
        UpstreamMessage joiningMessage = new()
        {
            JoinGroupMessage = new Webpubsub.JoinGroupMessage()
            {
                Group = Group,
            }
        };
        await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(joiningMessage.ToByteArray()), WebPubSubDataType.Binary);

        Console.WriteLine("Join group message sent");
        await Task.Delay(1000);

        while (true)
        {
            Console.WriteLine("Enter the message to send or just press enter to stop:");
            var message = Console.ReadLine();

            if (!string.IsNullOrEmpty(message))
            {
                UpstreamMessage dataMessage = new()
                {
                    EventMessage = new EventMessage()
                    {
                        Data = new MessageData()
                        {
                            TextData = message
                        }
                    }
                };
                await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(dataMessage.ToByteArray()), WebPubSubDataType.Binary);
                Console.WriteLine("Data message sent");
            }
            else
            {
                await serviceClient.LeaveGroupAsync(Group);
                await serviceClient.StopAsync();
                break;
            }
        }
    }

enter image description here Output:

Output

Sign up to request clarification or add additional context in comments.

7 Comments

'Method 'ParseMessage' in type 'WebPubSub.Client.Protobuf.WebPubSubProtobufProtocol' from assembly 'WebPubSub.Protobuf, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null' does not have an implementation.'
Also could you post correct Message structure?
add <PackageReference Include="WebPubSub.Protobuf" Version="1.0.0" /> and install Azure.Messaging.WebPubSub.Clients, Google.Protobuf , WebPubSub.Client.Protobuf and Webpubsub
do you mean by webpubsub.proto ?
@deathrace Please refer to my GitHub repository for the complete code.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.