Simple Message Framing Sample for TCP Socket - Part 2 (Asynchronous)
As a follow up to my last post (https://blogs.msdn.com/joncole/archive/2006/03/20/555721.aspx) I decided to add some sample code for implementing an asynchronous version of ReadMessage. All of the same rules apply as discussed in my previous post, but we now have more code we have to create in order to maintain state across calls to our handling code. I have chosen not to implement the sending of a message asynchronously because the reading of a message asynchronously is much more interesting.
We also have the same basic logic as the synchronous implementation (read size, then read the body) but we have to keep state across multiple calls to our code that handles the arrival of data.
Note: There are some special cases related to this sample code that you will need to handle (or at least take into consideration)
1) Local socket is closed by another thread. This could result in an ObjectDisposedException being thrown from this code when accessing the socket.
2) Remote side resets the connection. This results in a SocketException being thrown when using the socket to send or receive data
3) Simultaneous duplicate calls to ReadMessage or ReadMessageAsync on the same socket should not be allowed. Doing this will result very odd behavior. Some examples are: you could potentially run into chopped messages, apparent hangs (one side expecting to read more data than the other side is going to send), or OutOfMemoryExceptions (because you interpret a portion of the message body of one request to be the message size of another request and allocate too much memory). Basically you are causing an alignment problem in your stream protocol parsing, which breaks the entire app.
First, we need a couple of objects for holding our state and some helper code to handle the asynchronous notification to the user that a message has arrived.
//********************************************** //the definition of a function that the user has to give us for us to //notify him/her that a message has arrived. //**********************************************public delegate void ReadMessageAsyncCallback(ReadMessageEventArgs e); //**********************************************//This is the argument passed to the above delegate//**********************************************public class ReadMessageEventArgs : EventArgs{ internal ReadMessageEventArgs(string msg) { Message = msg; Error = null; } internal ReadMessageEventArgs(Exception err) { Error = err; Message = null; } public readonly Exception Error; public readonly string Message; }
//**********************************************//this class keeps track of our state during the read operation //**********************************************internal class ReadMessageAsyncState : IDisposable { public ReadMessageAsyncState(Socket socket, ReadMessageAsyncCallback callback) { if (socket == null) throw new ArgumentNullException("socket"); if (callback == null) throw new ArgumentNullException("callback"); userSocket = socket; userCallback = callback; buffer = new byte[4]; bytesReceived = 0; messageSize = -1; } public void Dispose() { userSocket = null; userCallback = null; buffer = null; } public Socket userSocket; public ReadMessageAsyncCallback userCallback; public int bytesReceived; public byte[] buffer; public int messageSize; }
|
Now that we have defined all of the helper objects lets get to the new ReadMessageAsync function implementation. This is the function that would be called to start the async read message process. It basically just creates the async state object and then starts an async read on the socket. Note that in this implementation I have not provided a way to cancel the asynchronous operation in case where other side of stream is behaving badly. You may need to consider doing so for your application.
public static void ReadMessageAsync(Socket socket, ReadMessageAsyncCallback callback) { ReadMessageAsyncState state = new ReadMessageAsyncState(socket, callback); socket.BeginReceive(state.buffer, 0, //offset state.buffer.Length, //how much data can be read SocketFlags.None, new AsyncCallback(OnReceive), state); } |
Now, all the magic really happens in the OnReceive function (my callback passed to the BeginReceive calls on the socket). This fuction is what is responsible for starting the next read on the socket if more data is needed, it calls the user callback when the entire message has been received and it handles any error and sets it on the ReadMessageEventArgs object.
static void OnReceive(IAsyncResult ar) { ReadMessageAsyncState state = ar.AsyncState as ReadMessageAsyncState; try { int count = state.userSocket.EndReceive(ar); state.bytesReceived += count; if (state.messageSize == -1)//we are still reading the size of the data { if (count == 0) throw new ProtocolViolationException("The remote peer closed the connection while reading the message size."); if (state.bytesReceived == 4)//we have received the entire message size information { //read the size of the message state.messageSize = BitConverter.ToInt32(state.buffer, 0); if (state.messageSize < 0) { throw new ProtocolViolationException("The remote peer sent a negative message size."); } //we should do some size validation here also (e.g. restrict incoming messages to x bytes long) state.buffer = new Byte[state.messageSize]; //reset the bytes received back to zero //because we are now switching to reading the message body state.bytesReceived = 0; } if (state.messageSize != 0) { //we need more data - could be more of the message size information //or it could be the message body. The only time we won't need to //read more data is if the message size == 0 state.userSocket.BeginReceive(state.buffer, state.bytesReceived, //offset where data can be written state.buffer.Length - state.bytesReceived, //how much data can be read into remaining buffer SocketFlags.None, new AsyncCallback(OnReceive), state); } else { //we have received a zero length message, notify the user... ReadMessageEventArgs args = new ReadMessageEventArgs(String.Empty); state.userCallback(args); state.Dispose(); } } else //we are reading the body of the message { if (state.bytesReceived == state.messageSize) //we have the entire message { //notify the user state.userCallback(new ReadMessageEventArgs(Encoding.ASCII.GetString(state.buffer))); //free up our reference to the socket, buffer and the callback object. state.Dispose(); } else //need more data. { if (count == 0) throw new ProtocolViolationException("The remote peer closed the connection before the entire message was received"); state.userSocket.BeginReceive(state.buffer, state.bytesReceived, //offset where data can be written state.buffer.Length - state.bytesReceived, //how much data can be read into remaining buffer SocketFlags.None, new AsyncCallback(OnReceive), state); } } } catch (Exception ex) { ReadMessageEventArgs args = new ReadMessageEventArgs(ex); state.userCallback(args); state.Dispose(); } } |
I would also recommend that you read Malar Chinnusamy's blog on socket programming considerations.
Comments
Anonymous
October 31, 2006
Hi, if (state.bytesReceived == 4)//we have received the entire message size information How could you make sure the recieve byte number is less than or equal to 4, if the first time, the count is more than 4, so what will happen? Thanks.Anonymous
December 01, 2006
When the first BeginReceive is started, the buffer passed in is only 4 bytes in size. This means that we can't receive more than 4 bytes, making it possible to use this logic. If we were trying to build a more performant system, we wouldn't use just 4 bytes for the buffer and we would have to check for more than 4 bytes being received. I wrote it this way to simplify the example code for the purposes of this sample.Anonymous
May 25, 2010
hi jincole what is the function definition of ReadMessageAsyncCallback delegate lets say catch (Exception ex) { ReadMessageEventArgs args = new ReadMessageEventArgs(ex); state.userCallback(args); state.Dispose(); } than which function will be call by state.userCallback(args); and how ReadMessageAsyncCallback created and from where i'll call ReadMessageAsync. please reply to my comment i'm trying to develop tool based on your post my email address is kashif.jawed@hotmail.comAnonymous
June 10, 2010
Kashif, A simple implementation of that callback would be void HandleReceivedMessage(ReadMessageEventArgs e) { if (e.Error) { //do something to handle the error } else { //process the message. } } it can be instantiated with something like this: ReadMessageAsyncCallback receiveMessageHandler = new ReadMessageAsyncCallback(HandleReceivedMessage); Hope that helps.Anonymous
June 13, 2010
your this comment confuses me. " If we were trying to build a more performant system, we wouldn't use just 4 bytes for the buffer and we would have to check for more than 4 bytes being received. I wrote it this way to simplify the example code for the purposes of this sample." you mean there can be a better solution like requesting with an greater dimension array and we try to get whatever we can get. length+msg (entire or partial). so we read the length, according to that we can decide whether to receive more or not? I am looking for a best performant system. can you give me a suggestion please? by the way second question is, i implemeted your code and what happened with me is that when i was reading the message body 2nd request came in role and because of max length is still greater -1,process skipped to the else part although it was only 4 bytes length reading attempt. shortly, for simultaneous calls it blows up. ok i know you warned above it wont. but how can we make it working. is there anyway? Ps, if it is possible to write you directly like email, that will be great :) thanks thanksAnonymous
July 28, 2010
Yes, I meant passing a lager byte array to the Socket.Receive call. Every call to Socket.Receive (or socket.Send for that matter) results in two transitions that can be relatively expensive. The first is from managed code to unmanged code. The second is from user mode to kernel mode. Fewer calls to Socket.Receive means fewer transitions, which can help improve performance. Hope that helps.