Azure PuSub with protobuf data not working

dheeraj awale 21 Reputation points
2025-02-26T15:48:53.1566667+00:00

I am using generic WebSocket client (System.Net.WebSockets) to connect to Azure Web PubSub service and communicate between two parties. I referred this official document. I was able to communicate using json but with protobuf I don't receive any data. Below is the sample code I am using:

EDIT: client is receiving some message (after changing class type to 'DownstreamMessage') but it says: disconnectedMessage": { "reason": "Invalid payload. Invalid value : '': Invalid event name. Valid event name should be word in between 1 and 128 characters long. I don't understand whats happening

// Create a joining message
// used downstream to receive messages
UpstreamMessage joiningMessage = new()
{
    JoinGroupMessage = new UpstreamMessage.Types.JoinGroupMessage()
    {
        Group = Group,
    }
};
var joinData = joiningMessage.ToByteArray();

// Initialize the WebSocket
ClientWebSocket webSocket = new();
webSocket.Options.AddSubProtocol("protobuf.webpubsub.azure.v1");
await webSocket.ConnectAsync(new Uri(connStr), CancellationToken.None);

// Set up a continuous message receiver
_ = Task.Run(async () =>
{
    while (webSocket.State == WebSocketState.Open)
    {
        var buffer = new byte[8192];
        var result = await webSocket.ReceiveAsync(
                                    new ArraySegment<byte>(buffer),
                                    CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Binary)
        {
            // Create a properly sized array containing only the actual message
            var messageBytes = new byte[result.Count];
            Array.Copy(buffer, 0, messageBytes, 0, result.Count);
            try
            {
                var msg = DownstreamMessage.Parser.ParseFrom(messageBytes);
                Console.WriteLine($"Received message: {msg}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error parsing message: {ex.Message}");
            }
        }
        else if (result.MessageType == WebSocketMessageType.Close)
        {
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
            break;
        }
    }
}
C#
C#
An object-oriented and type-safe programming language that has its roots in the C family of languages and includes support for component-oriented programming.
11,317 questions
{count} votes

Accepted answer
  1. Sampath 510 Reputation points Microsoft Vendor
    2025-02-28T07:34:51.82+00:00

    Hello @dheeraj awale,

    The error you encountered, Invalid event name, indicates that the Protobuf message structure was incorrect. I have ensured that EventMessage properly sets the Data field.

    Use BinaryData.FromBytes(message.ToByteArray()) to serialize messages like below.

    
    await serviceClient.SendToGroupAsync(Group, BinaryData.FromString(message), WebPubSubDataType.Binary);
    
    

    Install Azure.Messaging.WebPubSub.Clients, Google.Protobuf, WebPubSub.Client.Protobuf and Webpubsub

    Make sure to Allow Sending To All Groups and Allow Joining/Leaving All Groups

    enter image description here

    Below is the sample code for Azure PubSub with protobuf:

    
     private const string Group = "protobuf-client-group";
    
        private const string Uri = "wss://"; 
    
        static async Task Main()
    
        {
    
            var serviceClient = new WebPubSubClient(new Uri(Uri), new WebPubSubClientOptions
    
            {
    
                Protocol = new WebPubSubProtobufProtocol()
    
            });
    
            serviceClient.Connected += arg =>
    
            {
    
                Console.WriteLine($"Connected with connection id: {arg.ConnectionId}");
    
                return Task.CompletedTask;
    
            };
    
            serviceClient.Disconnected += arg =>
    
            {
    
                Console.WriteLine($"Disconnected from connection id: {arg.ConnectionId}");
    
                return Task.CompletedTask;
    
            };
    
            serviceClient.GroupMessageReceived += arg =>
    
            {
    
                Console.WriteLine($"Received Protobuf message: {arg.Message.Data}");
    
                return Task.CompletedTask;
    
            };
    
            await serviceClient.StartAsync();
    
            await serviceClient.JoinGroupAsync(Group);
    
            UpstreamMessage joiningMessage = new()
    
            {
    
                JoinGroupMessage = new Webpubsub.JoinGroupMessage()
    
                {
    
                    Group = Group,
    
                }
    
            };
    
            await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(joiningMessage.ToByteArray()), WebPubSubDataType.Binary);
    
            Console.WriteLine("Join group message sent");
    
            await Task.Delay(1000);
    
            while (true)
    
            {
    
                Console.WriteLine("Enter the message to send or just press enter to stop:");
    
                var message = Console.ReadLine();
    
                if (!string.IsNullOrEmpty(message))
    
                {
    
                    UpstreamMessage dataMessage = new()
    
                    {
    
                        EventMessage = new EventMessage()
    
                        {
    
                            Data = new MessageData()
    
                            {
    
                                TextData = message
    
                            }
    
                        }
    
                    };
    
                    await serviceClient.SendToGroupAsync(Group, BinaryData.FromBytes(dataMessage.ToByteArray()), WebPubSubDataType.Binary);
    
                    Console.WriteLine("Data message sent");
    
                }
    
                else
    
                {
    
                    await serviceClient.LeaveGroupAsync(Group);
    
                    await serviceClient.StopAsync();
    
                    break;
    
                }
    
            }
    
        }
    
    

    webpubsub.proto:

    
    syntax = "proto3";
    
    package webpubsub;
    
    message UpstreamMessage {
    
        oneof message {
    
            JoinGroupMessage join_group_message = 1;
    
            EventMessage event_message = 2;
    
        }
    
    }
    
    message JoinGroupMessage {
    
        string group = 1;
    
    }
    
    message EventMessage {
    
        string event_name = 1;
    
        MessageData data = 2;
    
    }
    
    message MessageData {
    
        oneof data {
    
            string text_data = 1;
    
            bytes binary_data = 2;
    
        }
    
    }
    
    message DownstreamMessage {
    
        oneof message {
    
            GroupDataMessage group_data_message = 1;
    
        }
    
    }
    
    message GroupDataMessage {
    
        string group = 1;
    
        MessageData data = 2;
    
    }
    
    

    Console Output:

    Output

    Please refer to my GitHub repository for the complete code.

    Hope this helps!

    If you found this answer helpful, please click Accept Answer and kindly upvote it.

    accept

    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.