Azure PuSub with protobuf data not working

dheeraj awale 21 Reputation points
2025-02-26T15:48:53.1566667+00:00

I am using generic WebSocket client (System.Net.WebSockets) to connect to Azure Web PubSub service and communicate between two parties. I referred this official document. I was able to communicate using json but with protobuf I don't receive any data. Below is the sample code I am using:

EDIT: client is receiving some message (after changing class type to 'DownstreamMessage') but it says: disconnectedMessage": { "reason": "Invalid payload. Invalid value : '': Invalid event name. Valid event name should be word in between 1 and 128 characters long. I don't understand whats happening

// Create a joining message
// used downstream to receive messages
UpstreamMessage joiningMessage = new()
{
    JoinGroupMessage = new UpstreamMessage.Types.JoinGroupMessage()
    {
        Group = Group,
    }
};
var joinData = joiningMessage.ToByteArray();

// Initialize the WebSocket
ClientWebSocket webSocket = new();
webSocket.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
await webSocket.ConnectAsync(new Uri(connStr), CancellationToken.None);

// Set up a continuous message receiver
_ = Task.Run(async () =>
{
    while (webSocket.State == WebSocketState.Open)
    {
        var buffer = new byte[8192];
        var result = await webSocket.ReceiveAsync(
                                    new ArraySegment<byte>(buffer),
                                    CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Binary)
        {
            // Create a properly sized array containing only the actual message
            var messageBytes = new byte[result.Count];
            Array.Copy(buffer, 0, messageBytes, 0, result.Count);
            try
            {
                var msg = DownstreamMessage.Parser.ParseFrom(messageBytes);
                Console.WriteLine($"Received message: {msg}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error parsing message: {ex.Message}");
            }
        }
        else if (result.MessageType == WebSocketMessageType.Close)
        {
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
            break;
        }
    }
}
C#
C#
An object-oriented and type-safe programming language that has its roots in the C family of languages and includes support for component-oriented programming.
11,317 questions
{count} votes

Accepted answer
  1. Sampath 510 Reputation points Microsoft Vendor
    2025-02-28T07:34:51.82+00:00

    Hello @dheeraj awale,

    The error you encountered, Invalid event name, indicates that the Protobuf message structure was incorrect. I have ensured that EventMessage properly sets the Data field.

    Use BinaryData.FromBytes(message.ToByteArray()) to serialize messages like below.

    
    await serviceClient.SendToGroupAsync(Group, BinaryData.FromString(message), WebPubSubDataType.Binary);
    
    

    Install Azure.Messaging.WebPubSub.Clients, Google.Protobuf, WebPubSub.Client.Protobuf and Webpubsub

    Make sure to Allow Sending To All Groups and Allow Joining/Leaving All Groups

    enter image description here

    Below is the sample code for Azure PubSub with protobuf:

    
     private const string Group = "protobuf-client-group";
    
        private const string Uri = "wss://"; 
    
        static async Task Main()
    
        {
    
            var serviceClient = new WebPubSubClient(new Uri(Uri), new WebPubSubClientOptions
    
            {
    
                Protocol = new WebPubSubProtobufProtocol()
    
            });
    
            serviceClient.Connected += arg =>
    
            {
    
                Console.WriteLine($"Connected with connection id: {arg.ConnectionId}");
    
                return Task.CompletedTask;
    
            };
    
            serviceClient.Disconnected += arg =>
    
            {
    
                Console.WriteLine($"Disconnected from connection id: {arg.ConnectionId}");
    
                return Task.CompletedTask;
    
            };
    
            serviceClient.GroupMessageReceived += arg =>
    
            {
    
                Console.WriteLine($"Received Protobuf message: {arg.Message.Data}");
    
                return Task.CompletedTask;
    
            };
    
            await serviceClient.StartAsync();
    
            await serviceClient.JoinGroupAsync(Group);
    
            UpstreamMessage joiningMessage = new()
    
            {
    
                JoinGroupMessage = new Webpubsub.JoinGroupMessage()
    
                {
    
                    Group = Group,
    
                }
    
            };
    
            await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(joiningMessage.ToByteArray()), WebPubSubDataType.Binary);
    
            Console.WriteLine("Join group message sent");
    
            await Task.Delay(1000);
    
            while (true)
    
            {
    
                Console.WriteLine("Enter the message to send or just press enter to stop:");
    
                var message = Console.ReadLine();
    
                if (!string.IsNullOrEmpty(message))
    
                {
    
                    UpstreamMessage dataMessage = new()
    
                    {
    
                        EventMessage = new EventMessage()
    
                        {
    
                            Data = new MessageData()
    
                            {
    
                                TextData = message
    
                            }
    
                        }
    
                    };
    
                    await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(dataMessage.ToByteArray()), WebPubSubDataType.Binary);
    
                    Console.WriteLine("Data message sent");
    
                }
    
                else
    
                {
    
                    await serviceClient.LeaveGroupAsync(Group);
    
                    await serviceClient.StopAsync();
    
                    break;
    
                }
    
            }
    
        }
    
    

    webpubsub.proto:

    
    syntax = "proto3";
    
    package webpubsub;
    
    message UpstreamMessage {
    
        oneof message {
    
            JoinGroupMessage join_group_message = 1;
    
            EventMessage event_message = 2;
    
        }
    
    }
    
    message JoinGroupMessage {
    
        string group = 1;
    
    }
    
    message EventMessage {
    
        string event_name = 1;
    
        MessageData data = 2;
    
    }
    
    message MessageData {
    
        oneof data {
    
            string text_data = 1;
    
            bytes binary_data = 2;
    
        }
    
    }
    
    message DownstreamMessage {
    
        oneof message {
    
            GroupDataMessage group_data_message = 1;
    
        }
    
    }
    
    message GroupDataMessage {
    
        string group = 1;
    
        MessageData data = 2;
    
    }
    
    

    Console Output:

    Output

    Please refer to my GitHub repository for the complete code.

    Hope this helps!

    If you found this answer helpful, please click Accept Answer and kindly upvote it.

    accept

    You found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.