The Journey of the Lunch Launcher - Part 5: Receiving messages

Part 1 - The origins of the 'lunch launcher'
Part 2 - MEDC 2007
Part 3 - Managing the Transport
Part 4 - Sending messages
Part 4b - The output channel

In the series so far, we have seen how the Lunch Launcher manages the Store and Forward Messaging transport objects and how it sends messages (invitations, replies) to buddies.  Now, it's time to see how the messages are received.

As with all entries in this series, please note that the following disclaimer covers all examples contained herein.
//-----------------------------------------------------------------------//THIS CODE AND INFORMATION ARE PROVIDED AS IS WITHOUT WARRANTY OF ANY//KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A//PARTICULAR PURPOSE.//-----------------------------------------------------------------------
When the Lunch Launcher starts, it creates three child threads to listen for incoming messages.  The details of how these listeners are managed will be discussed in a future post.

As mentioned previously, when I was first writing the Lunch Launcher to show at MEDC 2007, code optimizations were not my primary concern.  The code had been written so that it was easy to describe in a big room (~400 people) during an introductory (200 level) session.  As such, there was a significant amount of near-exact duplication.

MEDC 2007
Let's start by taking a look at the original demo code (circa May 2007).  Given the level of similarity between each of the listeners, we will look only at how lunch invitations are received.
/// <summary>/// Receive lunch invitation messages/// </summary>private void InvitationListener(Object state){    Thread.CurrentThread.Name = "LunchManager - Invitation Listener";    TimeSpan timeout = new TimeSpan(0, 0, 0, 0, this.m_ReceiveTimeout);                this.m_InvitationTransportObjects = new TransportObjects(InviteChannelName);    try    {        // initialize the transport objects        this.m_InvitationTransportObjects.Initialize();                        Boolean keepRunning = true;                        while(keepRunning)        {            IInputChannel channel =                 this.m_InvitationTransportObjects.InputChannel;                                    try            {                // wait for an invitation                Message msg = channel.Receive(timeout);                                        if(null != msg)                {                    // there is a message to process                                                // get the sender's email address                    String from = (String)msg.Properties["FromEmailAddress"];                                                // process the message                    ProcessInvitation(msg, from);                }                else                {                    // there is no message to process                                                // determine why we did not receive a message                    switch(channel.State)                    {                        // transport faulted                        case CommunicationState.Faulted:                            // attempt to recover from the fault                            keepRunning = this.m_InvitationTransportObjects.Recover();                            break;                                                            // channel is closing/closed                        case CommunicationState.Closing:                        case CommunicationState.Closed:                            break;                                                            // unexpected state                        default:                            break;                    }                }            }            catch(TimeoutException)            { /* there was nothing to receive */ }            catch(CommunicationException)            {                // failure communicating with the service                //  sleep for awhile to allow it to recover                Thread.Sleep(timeout.Milliseconds);            }            catch            {                // fatal error                keepRunning = false;            }        }    }    catch    { /* fatal error */ }    finally    {        // uninitialize the transport objects        this.m_InvitationTransportObjects.Uninitialize();    }}
Here we have a pretty straight forward listener thread delegate.  Each loop iteration, it checks to see if it should stop and if not, try to receive a message.  Once received (msg != null), the message is passed to a processing method (to be talked about next time).  If the transport faults (CommunicationState.Faulted), we attempt a recovery.   On a CommunicationException, we pause for a few seconds and resume receiving.

Did you see the problem when the channel is closed?  On CommunicationState.Closing and CommunicationState.Closed, the loop retries and falls into the same path.  The reason this originally went unnoticed is that the InvitationListener is a background thread (started via ThreadPool.QueueUserWorkitem).  Background threads are terminated when the main thread exits, so I missed the bug.

Current version
Since returning from MEDC, I have made some significant changes to the listeners.  Most importantly, I have consolidated the code so that there is little, if any direct duplication.  This reduction in duplicate code allows bugs found in one listener to be fixed in the other two automatically, as evidenced by the example above.

Let's take a look at the current (October 2007) version.  I'll start by showing all of the thread delegates.
/// <summary>/// Receive lunch invitation messages/// </summary>private void InvitationListener(Object state){    Thread.CurrentThread.Name = "LunchManager - Invitation Listener";    Listen<LunchInvitation>(this.WaitMilliseconds);}/// <summary>/// Receive lunch invitation reply messages/// </summary>private void ReplyListener(Object state){    Thread.CurrentThread.Name = "LunchManager - Reply Listener";    Listen<LunchInvitationReply>(this.WaitMilliseconds);}

/// <summary>
/// Receive lunch details messages
/// </summary>
private void LunchDetailsListener(Object state)
{
    Thread.CurrentThread.Name = "LunchManager - Lunch Details Listener";
    Listen<LunchDetails>(this.WaitMilliseconds);
}

Quite a bit smaller and very similar, aren't they?  All of the common listening code, that had been triplicated, has been factored into a generic method called Listen<T>.  Let's look at that next.
/// <summary>/// Receive incoming messages/// </summary>/// <typeparam name="T">/// The type of messages that will be received/// </typeparam>/// <param name="waitMilliseconds">/// The length of time, in milliseconds, to wait before restarting after a communications failure/// </param>private void Listen<T>(Int32 waitMilliseconds){    TransportObjects transport = null;            try    {        Type objType = typeof(T);

        // get the transport
        transport = GetTransport<T>();
        transport.Initialize();
                       
        // get the serializer
        LunchObjectSerializer serializer = GetSerializer<T>();
                           
        Boolean keepRunning = true;
           
        while(keepRunning)
        {
            IInputChannel channel = transport.InputChannel;
                   
            try
            {
                // wait for a message
                //
                // note: there is no need for a timeout when calling Receive()
                //  when the channel is closed, Receive() will return
                Message msg = channel.Receive();
                   
                if(null != msg)
                {
                    // there is a message to process
                       
                    // get the sender's email address
                    String from = (String)msg.Properties["FromEmailAddress"];
                       
                    // process the message
                    ProcessMessage<T>(msg, from);
                }
                else
                {
                    // there is no message to process
                       
                    // determine why we did not receive a message
                    switch(channel.State)
                    {
                        // transport faulted
                        case CommunicationState.Faulted:
                            // attempt to recover from the fault
                            keepRunning = transport.Recover();
                            break;
                               
                        // channel is closing/closed
                        case CommunicationState.Closing:
                        case CommunicationState.Closed:
                            keepRunning = false;
                            break;
                               
                        // unexpected state
                        default:
                            break;
                    }
                }
            }
            catch(CommunicationException)
            {
                // failure communicating with the service
                //  sleep for a short time to allow it to recover
                Thread.Sleep(waitMilliseconds);
            }
            catch
            {
                // fatal error
                keepRunning = false;
            }
        }
    }
    catch(Exception e)
    {
        // fatal error
        // TODO: log failure
    }
    finally
    {
        transport.Uninitialize();
    }
}

The Listen<T> method looks pretty similar to the InvitationListener from the MEDC version.  The changes were mostly to make the code reusable, yet still type safe by turning it into a generic method. 

The key change here is to retrieve the appropriate TransportObjects and LunchObjectSerializer from collections managed by the lunch manager (to be talked about in a future post).  The rest is mostly just fit-and-finish and "tidying up" (ex: eliminate the TimeoutException that occurred whenever there was no message to receive).

Since this is an in-progress journey (as noted by the presence of 'TODO' in the snippet), there is still some testing and fine tuning to be done within the Lunch Launcher.  As changes are made, I plan on updating this and other posts in the series.

Upon reflection -- not the .NET type of reflection :) -- I realize that code clarity is not that much different between the MEDC to Oct 2007 versions of the listener.  In fact, the Oct 2007 version showcases just how similar the listeners are and might, in fact, be the clearer of the two.

Take care,
-- DK

[Edit: changed titile, we'll talk about processing next time]

Disclaimer(s):
This posting is provided "AS IS" with no warranties, and confers no rights.
The information contained within this post is in relation to beta software. Any and all details are subject to change.