Jaa


WCF Extensibility – Transport Channels – Duplex Channels

This post is part of a series about WCF extensibility points. For a list of all previous posts and planned future ones, go to the index page .

And we’ll get to the end of the channel layer and transports in general with this post. After looking in many details at two types of transport channel, I’ll close this section of the series with a look at a duplex channel. Duplex channels are those in which, once the connection is established between the client and the server, the latter doesn’t need to wait for messages from the client to send something. Instead, the server can send many messages to the client without being requested.

A duplex channel is essentially a pair of channels, one for each side of the communication, which can be used independently – the interface IDuplexChannel is indeed defined without any members of its own, simply inheriting from IInputChannel and IOutputChannel (and also IChannel and ICommunicationObject, but both input and output channel interfaces already inherit from them). It means that they can be used in both at the client and the server for contracts which use those channel shapes, such as one-way operations (for a small recap on channel shapes, see the first post about channels).

But duplex channels can be used for more “normal” cases as well, such as calling request/reply operations. The WCF channel has a multiplexer which will add a message id to outgoing messages and then correlate that message with incoming ones to be able to “respond” to a request. Notice that in order for this to work, the messages must support some form addressing (i.e., the addressing version of the message version of the encoder used by the transport must not be None).

This post will describe both input and output channel interfaces, and on the sample section I’ll show a simple duplex transport channel (based on the same framing used in the other transport channel posts) to illustrate its usage.

Public implementations in WCF

None. As with most channel extensibility points, there are no public implementations in WCF. There are many internal implementations, such as the one used by the TCP and named pipes transport.

Interface definition

  1. public interface IDuplexChannel : IInputChannel, IOutputChannel, IChannel, ICommunicationObject
  2. {
  3. }

The IDuplexChannel interface is literally empty as I mentioned above, so there’s nothing really to talk about it. It’s more interesting to talk about its components, so there we go.

  1. public interface IOutputChannel : IChannel, ICommunicationObject
  2. {
  3.     // Methods
  4.     IAsyncResult BeginSend(Message message, AsyncCallback callback, object state);
  5.     IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state);
  6.     void EndSend(IAsyncResult result);
  7.     void Send(Message message);
  8.     void Send(Message message, TimeSpan timeout);
  9.  
  10.     // Properties
  11.     EndpointAddress RemoteAddress { get; }
  12.     Uri Via { get; }
  13. }

The IOutputChannel interface is quite similar to the IRequestChannel interface, covered in the first post about transport channels. The main difference is that the output channel only sends messages, but doesn’t receive any responses to it. The properties are similar: RemoteAddress returns the address to which the channel is sending messages to, while Via is the transport address to which the request is sent. They are usually the same, but in cases where a message goes through multiple hops before reaching its final destination (i.e., in a proxying or routing scenario), the Via property points to the “next step”, while RemoteAddress points to the final destination of the message.

The output channel has also essentially one method, used to send messages, in various flavors. It can be synchronous (Send) or asynchronous (BeginSend / EndSend), and it can take a timeout parameter or have the channel assume a default timeout.

  1. public interface IInputChannel : IChannel, ICommunicationObject
  2. {
  3.     // Methods
  4.     IAsyncResult BeginReceive(AsyncCallback callback, object state);
  5.     IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state);
  6.     IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state);
  7.     IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state);
  8.     Message EndReceive(IAsyncResult result);
  9.     bool EndTryReceive(IAsyncResult result, out Message message);
  10.     bool EndWaitForMessage(IAsyncResult result);
  11.     Message Receive();
  12.     Message Receive(TimeSpan timeout);
  13.     bool TryReceive(TimeSpan timeout, out Message message);
  14.     bool WaitForMessage(TimeSpan timeout);
  15.  
  16.     // Properties
  17.     EndpointAddress LocalAddress { get; }
  18. }

The IInputChannel interface, on the other hand, is similar to the IReplyChannel interface, covered in the post about reply transport channels. Again, the main difference is that it just receives a message, without sending any response, so the *Receive methods don’t return any request context objects like in the reply channels. Like on the reply channel, there are both synchronous and asynchronous versions of the simple “give me a message” (Receive and BeginReceive / EndReceive, respectively) which throw an exception if a message isn’t received after a certain timeout. Also, like the reply channels, input channels also have the exception-less variations of TryReceive (and the asynchronous equivalent BeginTryReceive / EndTryReceive), which return a message if it arrives within the specified timeout, and the WaitForMessage (and its asynchronous counterparts BeginWaitForMessage / EndWaitForMessage) which peek on the input queue and tells the caller whether a message is available, but doesn’t return it (the caller needs to call another of the input channel methods to retrieve the message).

How to add a duplex transport channel

It’s the same as the other channels: you transport channels are added by using a channel factory (client) / or channel listener (server), which are created by a custom binding element. The binding element corresponding to a transport channel must be derived from the TransportBindingElement class. Besides the normal properties from the base BindingElement class, the transport binding element also defines a Scheme property, which needs to return the URI scheme for the transport. The sample code has an example of defining and adding a custom duplex transport channel. On the client side the binding element class must override both CanBuildChannelFactory<TChannel> (to tell the caller that it supports the IDuplexChannel interface) and BuildChannelFactory<TChannel> to actually build the factory which can build the channels. On the server side the binding element must do the listener counterparts, overriding CanBuildChannelListener<TChannel> (to tell the caller that it supports creating duplex channels) and BuildChannelListener<TChannel> to actually build the listener for such channels.

Real world scenario: implementing a custom transport protocol

I find it hard calling it a real world scenario, since I haven’t found any requests for this in forums or otherwise, so let’s treat it as a sample instead. Similar to the JSON-RPC scenario, we’ll create a custom transport protocol (the same, actually, with a 4-byte length, followed by the actual message). This time, however, we won’t be able to use the ByteStreamMessageEncodingBindingElement which we used in the JSON-RPC case, because, as I mentioned before, we need some addressing (which WCF will use to correlate between requests and replies). So for this scenario we’ll use a new encoding – the default will now be the MessageVersion.Default, which is the same as MessageVersion.Soap12WSAddressing10. The advantage of it is that, by using a format which is supported out-of-the-box, we don’t need any of the service model extensions which we did on the previous sample.

Even though the goal of having a duplex transport is to be able to do full duplex communication, I’ll actually start with a simple (e.g., request / reply) contract, since it’s a lot easier to test those by themselves (using a socket-based client or server, like the one created for the JSON-RPC sample). So we’ll start with that (that’s how I created the sample, so it makes sense for me to write about it this way).

And before we dive into the code, the usual disclaimer: this is a sample for illustrating the topic of this post, this is not production-ready code. I tested it for a few contracts and it worked, but I cannot guarantee that it will work for all scenarios (please let me know if you find a bug or something missing). I really kept the error checking to a minimum (especially in the “simple service” project and in many other helper functions whose inputs should be sanitized, to make the sample small. Also, I practically didn’t implement any timeouts in the channel (they’re mostly being ignored), otherwise this sample which is rather large would be even more complex.

Let’s start with a simple contract. To prevent service-model issues from validating it, let’s start with an untyped message contract.

  1. [ServiceContract]
  2. public interface IUntypedTest
  3. {
  4.     [OperationContract(Action = "*", ReplyAction = "*")]
  5.     Message Process(Message input);
  6. }
  7. public class Service : IUntypedTest
  8. {
  9.     public Message Process(Message input)
  10.     {
  11.         Message result = Message.CreateMessage(input.Version, "ReplyAction", "The response");
  12.         result.Headers.To = input.Headers.ReplyTo.Uri;
  13.         return result;
  14.     }
  15. }

With that contract, we can try to create a client which uses that new transport and consume that service.

  1. static void TestWithUntypedMessage()
  2. {
  3.     string baseAddress = SizedTcpDuplexTransportBindingElement.SizedTcpScheme + "://localhost:8000";
  4.     ServiceHost host = new ServiceHost(typeof(Service), new Uri(baseAddress));
  5.     MessageEncodingBindingElement encodingBE = new TextMessageEncodingBindingElement();
  6.     Binding binding = new CustomBinding(encodingBE, new SizedTcpDuplexTransportBindingElement());
  7.     host.AddServiceEndpoint(typeof(IUntypedTest), binding, "");
  8.     host.Open();
  9.  
  10.     Console.WriteLine("Host opened");
  11.  
  12.     Socket socket = GetConnectedSocket(8000);
  13.  
  14.     Message input = Message.CreateMessage(MessageVersion.Soap12WSAddressing10, "myAction", "Hello world");
  15.     input.Headers.To = new Uri(baseAddress);
  16.     input.Headers.ReplyTo = new EndpointAddress("https://www.w3.org/2005/08/addressing/anonymous");
  17.  
  18.     MessageEncoder encoder = encodingBE.CreateMessageEncoderFactory().Encoder;
  19.     BufferManager bufferManager = BufferManager.CreateBufferManager(int.MaxValue, int.MaxValue);
  20.     ArraySegment<byte> encoded = encoder.WriteMessage(input, int.MaxValue, bufferManager, 4);
  21.     Formatting.SizeToBytes(encoded.Count, encoded.Array, 0);
  22.  
  23.     Console.WriteLine("Sending those bytes:");
  24.     Debugging.PrintBytes(encoded.Array, encoded.Count + encoded.Offset);
  25.     socket.Send(encoded.Array, 0, encoded.Offset + encoded.Count, SocketFlags.None);
  26.     byte[] recvBuffer = new byte[10000];
  27.     int bytesRecvd = socket.Receive(recvBuffer);
  28.     Console.WriteLine("Received {0} bytes", bytesRecvd);
  29.     Debugging.PrintBytes(recvBuffer, bytesRecvd);
  30.  
  31.     socket.Close();
  32.  
  33.     Console.WriteLine("Press ENTER to close");
  34.     Console.ReadLine();
  35.     host.Close();
  36. }

Ok, that is the goal of the first “milestone”. Let’s start with the binding element class. In the example above we only need to override the channel listener part of the binding element (since we’re only using it in the server side), telling the caller that we do support a duplex channel (at CanBuildChannelListener<TChannel>) and create it when requested (at BuildChannelListener<TChannel>).

  1. public class SizedTcpDuplexTransportBindingElement : TransportBindingElement
  2. {
  3.     public const string SizedTcpScheme = "sized.tcp";
  4.  
  5.     public override string Scheme
  6.     {
  7.         get { return SizedTcpScheme; }
  8.     }
  9.  
  10.     public override bool CanBuildChannelListener<TChannel>(BindingContext context)
  11.     {
  12.         return typeof(TChannel) == typeof(IDuplexChannel);
  13.     }
  14.  
  15.     public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  16.     {
  17.         return (IChannelListener<TChannel>)(object)new SizedTcpDuplexChannelListener(this, context);
  18.     }
  19. }

The listener class is similar to the listener class used in the example for the reply channel. The main difference here is that instead of enforcing the byte stream message encoder, this channel can work with any encoding as long as its message version is the expected one. That means that we’ll be able to replace the TextMessageEncodingBindingElement with a BinaryMessageEncodingBindingElement, for example, and the code should work just fine.

  1. class SizedTcpDuplexChannelListener : ChannelListenerBase<IDuplexChannel>
  2. {
  3.     BufferManager bufferManager;
  4.     MessageEncoderFactory encoderFactory;
  5.     Socket listenSocket;
  6.     Uri uri;
  7.  
  8.     public SizedTcpDuplexChannelListener(SizedTcpDuplexTransportBindingElement bindingElement, BindingContext context)
  9.         : base(context.Binding)
  10.     {
  11.         // populate members from binding element
  12.         int maxBufferSize = (int)bindingElement.MaxReceivedMessageSize;
  13.         this.bufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, maxBufferSize);
  14.  
  15.         Collection<MessageEncodingBindingElement> messageEncoderBindingElements
  16.             = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
  17.  
  18.         if (messageEncoderBindingElements.Count > 1)
  19.         {
  20.             throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
  21.         }
  22.         else if (messageEncoderBindingElements.Count == 1)
  23.         {
  24.             if (messageEncoderBindingElements[0].MessageVersion  != MessageVersion.Soap12WSAddressing10)
  25.             {
  26.                 throw new InvalidOperationException("This transport must be used with the an encoding with MessageVersion.Soap12WSAddressing10.");
  27.             }
  28.  
  29.             this.encoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory();
  30.         }
  31.         else
  32.         {
  33.             this.encoderFactory = new TextMessageEncodingBindingElement(MessageVersion.Soap12WSAddressing10, Encoding.UTF8).CreateMessageEncoderFactory();
  34.         }
  35.  
  36.         this.uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
  37.     }
  38.  
  39.     protected override IDuplexChannel OnAcceptChannel(TimeSpan timeout)
  40.     {
  41.         try
  42.         {
  43.             Socket dataSocket = listenSocket.Accept();
  44.             return new SizedTcpDuplexServerChannel(this.encoderFactory.Encoder, this.bufferManager, new EndpointAddress(this.uri), dataSocket, this);
  45.         }
  46.         catch (ObjectDisposedException)
  47.         {
  48.             // socket closed
  49.             return null;
  50.         }
  51.     }
  52.  
  53.     protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
  54.     {
  55.         return new AcceptChannelAsyncResult(timeout, this, callback, state);
  56.     }
  57.  
  58.     protected override IDuplexChannel OnEndAcceptChannel(IAsyncResult result)
  59.     {
  60.         return AcceptChannelAsyncResult.End(result);
  61.     }
  62. }

The channel created on OnAcceptChannel is of type SizedTcpDuplexServerChannel, which is listed below. It will extend the “base” duplex channel (to be shown later), again, similar to the reply channel sample, to fill up the To header and the RemoteEndpointMessageProperty on incoming messages.

  1. class SizedTcpDuplexServerChannel : SizedTcpDuplexChannel
  2. {
  3.     Socket dataSocket;
  4.     public SizedTcpDuplexServerChannel(MessageEncoder messageEncoder, BufferManager bufferManager, EndpointAddress localAddress, Socket dataSocket, ChannelManagerBase channelManager)
  5.         :base(messageEncoder, bufferManager, channelManager, localAddress, null, null)
  6.     {
  7.         this.dataSocket = dataSocket;
  8.         this.InitializeSocket(dataSocket);
  9.     }
  10.  
  11.     protected override Message DecodeMessage(ArraySegment<byte> data)
  12.     {
  13.         Message result = base.DecodeMessage(data);
  14.         if (result != null)
  15.         {
  16.             result.Headers.To = this.LocalAddress.Uri;
  17.             IPEndPoint remoteEndpoint = (IPEndPoint)this.dataSocket.RemoteEndPoint;
  18.             RemoteEndpointMessageProperty property = new RemoteEndpointMessageProperty(remoteEndpoint.Address.ToString(), remoteEndpoint.Port);
  19.             result.Properties.Add(RemoteEndpointMessageProperty.Name, property);
  20.         }
  21.  
  22.         return result;
  23.     }
  24. }

The main duplex channel class inherits from the same SizedTcpBaseChannel which was first introduced in the post about request channels. Notice that the implementation of the duplex channel implementation is quite simple – since all the main logic has already been written on the base class.

  1. abstract class SizedTcpDuplexChannel : SizedTcpBaseChannel, IDuplexChannel
  2. {
  3.     Socket socket;
  4.     Uri via;
  5.     EndpointAddress localAddress;
  6.     EndpointAddress remoteAddress;
  7.  
  8.     public SizedTcpDuplexChannel(
  9.         MessageEncoder encoder,
  10.         BufferManager bufferManager,
  11.         ChannelManagerBase channelManager,
  12.         EndpointAddress localAddress,
  13.         EndpointAddress remoteAddress,
  14.         Uri via)
  15.         : base(encoder, bufferManager, channelManager)
  16.     {
  17.         this.via = via;
  18.         this.remoteAddress = remoteAddress;
  19.         this.localAddress = localAddress;
  20.     }
  21.  
  22.     protected override void InitializeSocket(Socket socket)
  23.     {
  24.         this.socket = socket;
  25.         base.InitializeSocket(socket);
  26.  
  27.         if (this.remoteAddress == null)
  28.         {
  29.             IPEndPoint remoteEndpoint = (IPEndPoint)socket.RemoteEndPoint;
  30.             UriBuilder builder = new UriBuilder(
  31.                 SizedTcpDuplexTransportBindingElement.SizedTcpScheme,
  32.                 remoteEndpoint.Address.ToString(),
  33.                 remoteEndpoint.Port);
  34.             this.remoteAddress = new EndpointAddress(builder.Uri);
  35.         }
  36.  
  37.         if (this.localAddress == null)
  38.         {
  39.             IPEndPoint localEndpoint = (IPEndPoint)socket.LocalEndPoint;
  40.             UriBuilder builder = new UriBuilder(
  41.                 SizedTcpDuplexTransportBindingElement.SizedTcpScheme,
  42.                 localEndpoint.Address.ToString(),
  43.                 localEndpoint.Port);
  44.             this.localAddress = new EndpointAddress(builder.Uri);
  45.         }
  46.     }
  47.  
  48.     #region IInputChannel Members
  49.  
  50.     public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
  51.     {
  52.         return base.BeginReceiveMessage(timeout, callback, state);
  53.     }
  54.  
  55.     public IAsyncResult BeginReceive(AsyncCallback callback, object state)
  56.     {
  57.         return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
  58.     }
  59.  
  60.     public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
  61.     {
  62.         return new TryReceiveAsyncResult(timeout, this, callback, state);
  63.     }
  64.  
  65.     public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
  66.     {
  67.         throw new NotSupportedException("No peeking support");
  68.     }
  69.  
  70.     public Message EndReceive(IAsyncResult result)
  71.     {
  72.         return base.EndReceiveMessage(result);
  73.     }
  74.  
  75.     public bool EndTryReceive(IAsyncResult result, out Message message)
  76.     {
  77.         return TryReceiveAsyncResult.End(result, out message);
  78.     }
  79.  
  80.     public bool EndWaitForMessage(IAsyncResult result)
  81.     {
  82.         throw new NotSupportedException("No peeking support");
  83.     }
  84.  
  85.     public EndpointAddress LocalAddress
  86.     {
  87.         get { return this.localAddress; }
  88.     }
  89.  
  90.     public Message Receive(TimeSpan timeout)
  91.     {
  92.         return base.ReceiveMessage(timeout);
  93.     }
  94.  
  95.     public Message Receive()
  96.     {
  97.         return this.Receive(this.DefaultReceiveTimeout);
  98.     }
  99.  
  100.     public bool TryReceive(TimeSpan timeout, out Message message)
  101.     {
  102.         try
  103.         {
  104.             message = base.ReceiveMessage(timeout);
  105.             return true;
  106.         }
  107.         catch (TimeoutException)
  108.         {
  109.             message = null;
  110.             return false;
  111.         }
  112.     }
  113.  
  114.     public bool WaitForMessage(TimeSpan timeout)
  115.     {
  116.         throw new NotSupportedException("No peeking support");
  117.     }
  118.  
  119.     #endregion
  120.  
  121.     #region IOutputChannel Members
  122.  
  123.     public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  124.     {
  125.         return base.BeginSendMessage(message, timeout, callback, state);
  126.     }
  127.  
  128.     public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
  129.     {
  130.         return this.BeginSend(message, this.DefaultSendTimeout, callback, state);
  131.     }
  132.  
  133.     public void EndSend(IAsyncResult result)
  134.     {
  135.         base.EndSendMessage(result);
  136.     }
  137.  
  138.     public EndpointAddress RemoteAddress
  139.     {
  140.         get { return this.remoteAddress; }
  141.     }
  142.  
  143.     public void Send(Message message, TimeSpan timeout)
  144.     {
  145.         base.SendMessage(message, timeout);
  146.     }
  147.  
  148.     public void Send(Message message)
  149.     {
  150.         this.Send(message, this.DefaultSendTimeout);
  151.     }
  152.  
  153.     public Uri Via
  154.     {
  155.         get { return this.via; }
  156.     }
  157.  
  158.     #endregion
  159. }

Time to try the first program. If everything was coded correctly, we should see a response from the service.

Received 410 bytes
00 00 01 96 3C 73 3A 45 6E 76 65 6C 6F 70 65 20 ....<s:E nvelope
78 6D 6C 6E 73 3A 73 3D 22 68 74 74 70 3A 2F 2F xmlns:s= "https://
77 77 77 2E 77 33 2E 6F 72 67 2F 32 30 30 33 2F www.w3.o rg/2003/
30 35 2F 73 6F 61 70 2D 65 6E 76 65 6C 6F 70 65 05/soap- envelope
22 20 78 6D 6C 6E 73 3A 61 3D 22 68 74 74 70 3A " xmlns: a="http:
2F 2F 77 77 77 2E 77 33 2E 6F 72 67 2F 32 30 30 //www.w3 .org/200
35 2F 30 38 2F 61 64 64 72 65 73 73 69 6E 67 22 5/08/add ressing"
3E 3C 73 3A 48 65 61 64 65 72 3E 3C 61 3A 41 63 ><s:Head er><a:Ac
74 69 6F 6E 20 73 3A 6D 75 73 74 55 6E 64 65 72 tion s:m ustUnder
73 74 61 6E 64 3D 22 31 22 3E 52 65 70 6C 79 41 stand="1 ">ReplyA
63 74 69 6F 6E 3C 2F 61 3A 41 63 74 69 6F 6E 3E ction</a :Action>
3C 61 3A 54 6F 20 73 3A 6D 75 73 74 55 6E 64 65 <a:To s: mustUnde
72 73 74 61 6E 64 3D 22 31 22 3E 68 74 74 70 3A rstand=" 1">http:
2F 2F 73 63 68 65 6D 61 73 2E 6D 69 63 72 6F 73 //schema s.micros
6F 66 74 2E 63 6F 6D 2F 32 30 30 35 2F 31 32 2F oft.com/ 2005/12/
53 65 72 76 69 63 65 4D 6F 64 65 6C 2F 41 64 64 ServiceM odel/Add
72 65 73 73 69 6E 67 2F 41 6E 6F 6E 79 6D 6F 75 ressing/ Anonymou
73 3C 2F 61 3A 54 6F 3E 3C 2F 73 3A 48 65 61 64 s</a:To> </s:Head
65 72 3E 3C 73 3A 42 6F 64 79 3E 3C 73 74 72 69 er><s:Bo dy><stri
6E 67 20 78 6D 6C 6E 73 3D 22 68 74 74 70 3A 2F ng xmlns ="http:/
2F 73 63 68 65 6D 61 73 2E 6D 69 63 72 6F 73 6F /schemas .microso
66 74 2E 63 6F 6D 2F 32 30 30 33 2F 31 30 2F 53 ft.com/2 003/10/S
65 72 69 61 6C 69 7A 61 74 69 6F 6E 2F 22 3E 54 erializa tion/">T
68 65 20 72 65 73 70 6F 6E 73 65 3C 2F 73 74 72 he respo nse</str
69 6E 67 3E 3C 2F 73 3A 42 6F 64 79 3E 3C 2F 73 ing></s: Body></s
3A 45 6E 76 65 6C 6F 70 65 3E :Envelop e>

One thing I pointed before was that since we’re using the SOAP message version, we wouldn’t need to use any service model extensibility and it should just work. Time to try it.

  1. [ServiceContract]
  2. public interface ITypedTest
  3. {
  4.     [OperationContract]
  5.     int Add(int x, int y);
  6.     [OperationContract]
  7.     int Subtract(int x, int y);
  8. }
  9. public class Service : ITypedTest
  10. {
  11.     public int Add(int x, int y)
  12.     {
  13.         return x + y;
  14.     }
  15.  
  16.     public int Subtract(int x, int y)
  17.     {
  18.         return x - y;
  19.     }
  20. }

Now we can create a client to test it. We need to create a request in the format expected by the server. One easy way to do that is to create a server using a binding such as WSHttpBinding (which uses the same message version), capture a request sent to the same contract, then replicate it. That’s what was done in the code below.

  1.         static void TestWithTypedMessage()
  2.         {
  3.             string baseAddress = SizedTcpDuplexTransportBindingElement.SizedTcpScheme + "://localhost:8000";
  4.             ServiceHost host = new ServiceHost(typeof(Service), new Uri(baseAddress));
  5.             Binding binding = new CustomBinding(new SizedTcpDuplexTransportBindingElement());
  6.             host.AddServiceEndpoint(typeof(ITypedTest), binding, "");
  7.             host.Open();
  8.  
  9.             Console.WriteLine("Host opened");
  10.  
  11.             Socket socket = GetConnectedSocket(8000);
  12.  
  13.             string request = @"<s:Envelope
  14.         xmlns:s=""https://www.w3.org/2003/05/soap-envelope""
  15.         xmlns:a=""https://www.w3.org/2005/08/addressing"">
  16.     <s:Header>
  17.         <a:Action s:mustUnderstand=""1"">https://tempuri.org/ITypedTest/Add</a:Action>
  18.         <a:MessageID>urn:uuid:c2998797-7312-481a-8f73-230406b12bea</a:MessageID>
  19.         <a:ReplyTo>
  20.             <a:Address>https://www.w3.org/2005/08/addressing/anonymous</a:Address>
  21.         </a:ReplyTo>
  22.         <a:To s:mustUnderstand=""1"">ENDPOINT_ADDRESS</a:To>
  23.     </s:Header>
  24.     <s:Body>
  25.         <Add xmlns=""https://tempuri.org/"">
  26.             <x>4</x>
  27.             <y>5</y>
  28.         </Add>
  29.     </s:Body>
  30. </s:Envelope>";
  31.  
  32.             request = request.Replace("ENDPOINT_ADDRESS", baseAddress);
  33.  
  34.             Encoding encoding = new UTF8Encoding(false);
  35.             int byteCount = encoding.GetByteCount(request);
  36.             byte[] reqBytes = new byte[byteCount + 4];
  37.             Formatting.SizeToBytes(byteCount, reqBytes, 0);
  38.             encoding.GetBytes(request, 0, request.Length, reqBytes, 4);
  39.             Console.WriteLine("Sending those bytes:");
  40.             Debugging.PrintBytes(reqBytes);
  41.             socket.Send(reqBytes);
  42.             byte[] recvBuffer = new byte[10000];
  43.             int bytesRecvd = socket.Receive(recvBuffer);
  44.             Console.WriteLine("Received {0} bytes", bytesRecvd);
  45.             Debugging.PrintBytes(recvBuffer, bytesRecvd);
  46.  
  47.             Console.WriteLine("Press ENTER to send another request");
  48.             Console.ReadLine();
  49.  
  50.             request = @"<s:Envelope
  51.         xmlns:s=""https://www.w3.org/2003/05/soap-envelope""
  52.         xmlns:a=""https://www.w3.org/2005/08/addressing"">
  53.     <s:Header>
  54.         <a:Action s:mustUnderstand=""1"">https://tempuri.org/ITypedTest/Subtract</a:Action>
  55.         <a:MessageID>urn:uuid:c2998797-7312-481a-8f73-230406b12bea</a:MessageID>
  56.         <a:ReplyTo>
  57.             <a:Address>https://www.w3.org/2005/08/addressing/anonymous</a:Address>
  58.         </a:ReplyTo>
  59.         <a:To s:mustUnderstand=""1"">ENDPOINT_ADDRESS</a:To>
  60.     </s:Header>
  61.     <s:Body>
  62.         <Subtract xmlns=""https://tempuri.org/"">
  63.             <x>4</x>
  64.             <y>5</y>
  65.         </Subtract>
  66.     </s:Body>
  67. </s:Envelope>";
  68.  
  69.             request = request.Replace("ENDPOINT_ADDRESS", baseAddress);
  70.             byteCount = encoding.GetByteCount(request);
  71.             reqBytes = new byte[byteCount + 4];
  72.             Formatting.SizeToBytes(byteCount, reqBytes, 0);
  73.             encoding.GetBytes(request, 0, request.Length, reqBytes, 4);
  74.             Console.WriteLine("Sending those bytes:");
  75.             Debugging.PrintBytes(reqBytes);
  76.             socket.Send(reqBytes);
  77.             bytesRecvd = socket.Receive(recvBuffer);
  78.             Console.WriteLine("Received {0} bytes", bytesRecvd);
  79.             Debugging.PrintBytes(recvBuffer, bytesRecvd);
  80.  
  81.             socket.Close();
  82.  
  83.             Console.WriteLine("Press ENTER to close");
  84.             Console.ReadLine();
  85.             host.Close();
  86.         }

Ok, now we know that the server side is working. Next time, move to the client. To test that, I used a similar “simple” socket server as the sample for request channels.

  1. [ServiceContract]
  2. public interface ITypedTest
  3. {
  4.     [OperationContract]
  5.     int Add(int x, int y);
  6.     [OperationContract]
  7.     int Subtract(int x, int y);
  8. }
  9.  
  10. class Program
  11. {
  12.     static void Main(string[] args)
  13.     {
  14.         SocketsServer s = new SocketsServer(8000);
  15.         s.StartServing();
  16.  
  17.         Binding binding = new CustomBinding(new SizedTcpDuplexTransportBindingElement());
  18.         string address = SizedTcpDuplexTransportBindingElement.SizedTcpScheme + "://localhost:8000";
  19.         ChannelFactory<ITypedTest> factory = new ChannelFactory<ITypedTest>(binding, new EndpointAddress(address));
  20.         ITypedTest proxy = factory.CreateChannel();
  21.  
  22.         Console.WriteLine(proxy.Add(4, 5));
  23.         Console.WriteLine(proxy.Subtract(44, 66));
  24.  
  25.         s.StopServing();
  26.     }
  27. }

Trying to run the code above, it didn’t work – with the following error: “System.InvalidOperationException: The binding (Name=CustomBinding, Namespace=https://tempuri.org/) cannot be used to create a ChannelFactory or a ChannelListener because it appears to be missing a TransportBindingElement.  Every binding must have at least one binding element that derives from TransportBindingElement”. The error message isn’t great, because we do have a transport binding element in the binding (it’s the only one). The problem is that it doesn’t create any channel factory, since we didn’t configure it to do so. With the two overrides below, it starts working out fine.

  1. public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
  2. {
  3.     return typeof(TChannel) == typeof(IDuplexChannel);
  4. }
  5.  
  6. public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
  7. {
  8.     return (IChannelFactory<TChannel>)(object)new SizedTcpDuplexChannelFactory(this, context);
  9. }

Now, the duplex part. If everything went right (and by testing the client and server parts separately, we already have a good confidence in the code). I’ll try with a simple duplex contract, as shown below. When the service receives a request (Hello), it will send a series of OnHello messages to the client (more than one).

  1. [ServiceContract(CallbackContract = typeof(ITestCallback))]
  2. public interface ITest
  3. {
  4.     [OperationContract]
  5.     void Hello(string text);
  6. }
  7.  
  8. [ServiceContract]
  9. public interface ITestCallback
  10. {
  11.     [OperationContract(IsOneWay = true)]
  12.     void OnHello(string text);
  13. }
  14.  
  15. public class Service : ITest
  16. {
  17.     public void Hello(string text)
  18.     {
  19.         Console.WriteLine("[server received] {0}", text);
  20.         ITestCallback callback = OperationContext.Current.GetCallbackChannel<ITestCallback>();
  21.         ThreadPool.QueueUserWorkItem(delegate
  22.         {
  23.             for (int i = 1; i <= 5; i++)
  24.                 callback.OnHello(text + ", " + i);
  25.         });
  26.     }
  27. }

In order to use a duplex contract in WCF, the client needs to implement the callback interface, then to use the DuplexChannelFactory<TChannel> to create a channel passing that implementation (wrapped in an InstanceContext object).

  1. class Program
  2. {
  3.     class ClientCallback : ITestCallback
  4.     {
  5.         public void OnHello(string text)
  6.         {
  7.             Console.WriteLine("[client received] {0}", text);
  8.         }
  9.     }
  10.  
  11.     static void Main(string[] args)
  12.     {
  13.         string baseAddress = SizedTcpDuplexTransportBindingElement.SizedTcpScheme + "://localhost:8000";
  14.         ServiceHost host = new ServiceHost(typeof(Service), new Uri(baseAddress));
  15.         Binding binding = new CustomBinding(new SizedTcpDuplexTransportBindingElement());
  16.         ServiceEndpoint endpoint = host.AddServiceEndpoint(typeof(ITest), binding, "");
  17.         host.Open();
  18.         Console.WriteLine("Host opened");
  19.  
  20.         InstanceContext instanceContext = new InstanceContext(new ClientCallback());
  21.         EndpointAddress endpointAddress = new EndpointAddress(baseAddress);
  22.         DuplexChannelFactory<ITest> factory = new DuplexChannelFactory<ITest>(instanceContext, binding, endpointAddress);
  23.         ITest proxy = factory.CreateChannel();
  24.  
  25.         proxy.Hello("John Doe");
  26.  
  27.         Console.WriteLine("Press ENTER to close");
  28.         Console.ReadLine();
  29.  
  30.         ((IClientChannel)proxy).Close();
  31.         factory.Close();
  32.         host.Close();
  33.     }
  34. }

Run it, and voila, the server is able to send multiple messages to the client without being requested! That’s the magic of a duplex channel in action.

Final thoughts about transport channel extensibility

If there’s one thing which I hope to have shown in those examples, is that WCF is powerful enough to be used even in protocols for which it wasn’t designed. But one thing is certain: creating transport channels should not be your first option (or even your second). There’s a lot of asynchronous code which is inherently error-prone, and there is a lot of “magic” going on over the transport which we really don’t have control over. Often a simple socket-based client (or server) is preferable, but in cases where asynchrony is a plus (e.g., server scenarios, UI threads), you can consider extending WCF with a custom transport, and I hope those posts will shed some light in this area.

Coming up

We’re done with the channel layer, we’ll go back to the service model extensibility in the next post.

[Code in this post]

[Back to the index]