Send Messages to Service Bus Queue from REST Clients
I've had customers come to me asking how to make interop scenario work in Service Bus Messaging. One such scenario would be to send messages from REST client to Service Bus Listener listening over NetMessagingBinding that expects a binary encoded message.
Relevant Facts and Documentaion:
- NetMessagingBinding provides an automatic way of pulling off messages with its integration with WCFs ReceiveContext. However, you can control it manually by enabling ReceiveContext manual control to true. https://msdn.microsoft.com/en-us/library/system.servicemodel.receivecontextenabledattribute.manualcontrol(v=vs.110).aspx
- Controlling manually would set the brokered message receive mode to "PeekLock" and let the user code handle the completion or abandoning of the message. https://msdn.microsoft.com/en-us/library/windowsazure/microsoft.servicebus.messaging.receivemode.aspx
- The message format sent by REST client cannot be understood by WCF Service pulling messages over NetMessagingBinding as it is.
Solution:
- On sender side - make sure the message version is set properly, contentType is set on the brokered message, body of the brokered message is sent as stream and appropriate http headers are added.
Here's some sample code.
string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>" + i + "</Id></Record>";
WebClient RESTClient = new WebClient();
Random rand = new Random();
string sessionName = rand.Next(SampleManager.NumSessions).ToString();
// Creating BrokeredMessageProperty
BrokeredMessageProperty property = new BrokeredMessageProperty();
soapBody = interopPayload;
property.Label = soapBody;
property.ContentType = "application/soap+msbin1";
MessageVersion _messageVersion = MessageVersion.Soap12WSAddressing10;
// Creating message and adding BrokeredMessageProperty to the properties bag
Message message = Message.CreateMessage(_messageVersion, "SoapAction", soapBody);
message.Properties.Add(BrokeredMessageProperty.Name, property);
MemoryStream outStream = new MemoryStream();
XmlDictionaryWriter binaryWriter = XmlDictionaryWriter.CreateBinaryWriter(outStream);
XmlDocument doc = new XmlDocument();
doc.LoadXml(message.ToString());
doc.WriteContentTo(binaryWriter);
binaryWriter.Flush();
string binaryXmlAsString = Encoding.UTF8.GetString(outStream.ToArray());
BrokerProperties bpts = new BrokerProperties();
bpts.CorrelationId = "CorrelationId-" + i.ToString();
RESTClient.Headers["Authorization"] = "WRAP access_token=\"" + authorizationToken + "\"";
RESTClient.Headers["BrokerProperties"] = bpts.Serialize();
string sendAddress = "";
sendAddress = serviceAddress + queueName + "/Messages";
byte[] response = RESTClient.UploadData(sendAddress, "POST", outStream.ToArray());
- On the receiver side, the code is pretty straight forward except for an extra step of reading the streamed message.
NetMessagingBinding messagingBinding = new NetMessagingBinding("messagingBinding");
EndpointAddress address = SampleManager.GetEndpointAddress(queueName, serviceBusNamespace);
TransportClientEndpointBehavior securityBehavior = new TransportClientEndpointBehavior();
securityBehavior.TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(serviceBusIssuerName, serviceBusIssuerKey);
IChannelListener<IInputChannel> inputChannelListener = null;
IInputChannel inputChannel = null;
try
{
inputChannelListener = messagingBinding.BuildChannelListener<IInputChannel>(address.Uri, securityBehavior);
inputChannelListener.Open();
inputChannel = inputChannelListener.AcceptChannel();
inputChannel.Open();
while (true)
{
try
{
// Receive message from queue. If no more messages available, the operation throws a TimeoutException.
Message receivedMessage = inputChannel.Receive(receiveMessageTimeout);
SampleManager.OutputMessageInfo("Receive", receivedMessage);
}
catch (TimeoutException)
{
break;
}
}
// Close
inputChannel.Close();
inputChannelListener.Close();
……………………………….
public static void OutputMessageInfo(string action, Message message, string additionalText = "")
{
lock (typeof(SampleManager))
{
BrokeredMessageProperty property = (BrokeredMessageProperty)message.Properties[BrokeredMessageProperty.Name];
XmlDictionaryReader reader = message.GetReaderAtBodyContents();
string result = reader.ReadInnerXml();
Console.WriteLine(result);
Console.ResetColor();
}
}
I've attached the code, it is based on WCFChannelSessionSample in BrokeredMessaging scenario in the Service Bus SDK Samples - https://servicebus.codeplex.com/
Just make sure you edit your service bus issuer and key information, and the sample should run as it is. The following variables needs to be edited/updated with your namespace specifics in Receiver.cs, Sender.cs and SampleManager.cs
serviceBusNamespace = "*****************";
serviceBusIssuerName = "*****************";
serviceBusIssuerKey = "*****************";
ServiceBusSender-RESTAPISenderBinaryEncode.zip
Comments
- Anonymous
June 26, 2014
I am definitely bookmarking this page and sharing it with my friends. Very outstanding web. The content here is truly important. I will share it with my friends. - Anonymous
October 14, 2014
Hi guys, this might be helpful: developers.de/.../jumpstart-azure-service-bus-javascript-sdk.aspx