Simple Message Framing Sample for TCP Socket - Part 2 (Asynchronous)

As a follow up to my last post (https://blogs.msdn.com/joncole/archive/2006/03/20/555721.aspx) I decided to add some sample code for implementing an asynchronous version of ReadMessage.  All of the same rules apply as discussed in my previous post, but we now have more code we have to create in order to maintain state across calls to our handling code.  I have chosen not to implement the sending of a message asynchronously because the reading of a message asynchronously is much more interesting. 

 

We also have the same basic logic as the synchronous implementation (read size, then read the body) but we have to keep state across multiple calls to our code that handles the arrival of data. 

 

Note: There are some special cases related to this sample code that you will need to handle (or at least take into consideration)

1) Local socket is closed by another thread.  This could result in an ObjectDisposedException being thrown from this code when accessing the socket.

2) Remote side resets the connection.  This results in a SocketException being thrown when using the socket to send or receive data

3) Simultaneous duplicate calls to ReadMessage or ReadMessageAsync on the same socket should not be allowed.  Doing this will result very odd behavior.  Some examples are: you could potentially run into chopped messages, apparent hangs (one side expecting to read more data than the other side is going to send), or OutOfMemoryExceptions (because you interpret a portion of the message body of one request to be the message size of another request and allocate too much memory). Basically you are causing an alignment problem in your stream protocol parsing, which breaks the entire app.

 

First, we need a couple of objects for holding our state and some helper code to handle the asynchronous notification to the user that a message has arrived.

//**********************************************

//the definition of a function that the user has to give us for us to

//notify him/her that a message has arrived.

//**********************************************public delegate void ReadMessageAsyncCallback(ReadMessageEventArgs e);

//**********************************************//This is the argument passed to the above delegate//**********************************************public class ReadMessageEventArgs : EventArgs{

    internal ReadMessageEventArgs(string msg)

    {

        Message = msg;

        Error = null;

    }

    internal ReadMessageEventArgs(Exception err)

    {

        Error = err;

        Message = null;

    }

    public readonly Exception Error;

    public readonly string Message;

}

 

//**********************************************//this class keeps track of our state during the read operation

//**********************************************internal class ReadMessageAsyncState : IDisposable

{

public ReadMessageAsyncState(Socket socket, ReadMessageAsyncCallback callback)

{

if (socket == null)

throw new ArgumentNullException("socket");

if (callback == null)

throw new ArgumentNullException("callback");

userSocket = socket;

userCallback = callback;

buffer = new byte[4];

bytesReceived = 0;

messageSize = -1;

}

public void Dispose()

{

userSocket = null;

userCallback = null;

buffer = null;

}

public Socket userSocket;

public ReadMessageAsyncCallback userCallback;

public int bytesReceived;

public byte[] buffer;

public int messageSize;

}

 

 

 

Now that we have defined all of the helper objects lets get to the new ReadMessageAsync function implementation.  This is the function that would be called to start the async read message process.  It basically just creates the async state object and then starts an async read on the socket.  Note that in this implementation I have not provided a way to cancel the asynchronous operation in case where other side of stream is behaving badly.  You may need to consider doing so for your application.

public static void ReadMessageAsync(Socket socket,

            ReadMessageAsyncCallback callback)

{

    ReadMessageAsyncState state = new ReadMessageAsyncState(socket, callback);

    socket.BeginReceive(state.buffer,

        0, //offset

        state.buffer.Length, //how much data can be read

        SocketFlags.None, new AsyncCallback(OnReceive), state);

}

Now, all the magic really happens in the OnReceive function (my callback passed to the BeginReceive calls on the socket).  This fuction is what is responsible for starting the next read on the socket if more data is needed, it calls the user callback when the entire message has been received and it handles any error and sets it on the ReadMessageEventArgs object.

static void OnReceive(IAsyncResult ar)

{

ReadMessageAsyncState state = ar.AsyncState as ReadMessageAsyncState;

try

{

int count = state.userSocket.EndReceive(ar);

state.bytesReceived += count;

if (state.messageSize == -1)//we are still reading the size of the data

{

if (count == 0)

throw new ProtocolViolationException("The remote peer closed the connection while reading the message size.");

if (state.bytesReceived == 4)//we have received the entire message size information

{

//read the size of the message

state.messageSize = BitConverter.ToInt32(state.buffer, 0);

if (state.messageSize < 0)

{

throw new ProtocolViolationException("The remote peer sent a negative message size.");

}

//we should do some size validation here also (e.g. restrict incoming messages to x bytes long)

state.buffer = new Byte[state.messageSize];

//reset the bytes received back to zero

//because we are now switching to reading the message body

state.bytesReceived = 0;

}

if (state.messageSize != 0)

{

//we need more data - could be more of the message size information

//or it could be the message body. The only time we won't need to

//read more data is if the message size == 0

state.userSocket.BeginReceive(state.buffer,

state.bytesReceived, //offset where data can be written

state.buffer.Length - state.bytesReceived, //how much data can be read into remaining buffer

SocketFlags.None, new AsyncCallback(OnReceive), state);

}

else

{

//we have received a zero length message, notify the user...

ReadMessageEventArgs args = new ReadMessageEventArgs(String.Empty);

state.userCallback(args);

state.Dispose();

}

}

else //we are reading the body of the message

{

if (state.bytesReceived == state.messageSize) //we have the entire message

{

//notify the user

state.userCallback(new ReadMessageEventArgs(Encoding.ASCII.GetString(state.buffer)));

//free up our reference to the socket, buffer and the callback object.

state.Dispose();

}

else //need more data.

{

if (count == 0)

throw new ProtocolViolationException("The remote peer closed the connection before the entire message was received");

state.userSocket.BeginReceive(state.buffer,

state.bytesReceived, //offset where data can be written

state.buffer.Length - state.bytesReceived, //how much data can be read into remaining buffer

SocketFlags.None, new AsyncCallback(OnReceive), state);

}

}

}

catch (Exception ex)

{

ReadMessageEventArgs args = new ReadMessageEventArgs(ex);

state.userCallback(args);

state.Dispose();

}

}

I would also recommend that you read Malar Chinnusamy's blog on socket programming considerations.