How to implement the IAsyncResult design pattern

In this post, I will be presenting an implementation of the IAsyncResult design pattern that I will revisit in later posts. For my implementation, I would like to start with the implementation found in the March 2007 Concurrent Affairs column on the asynchronous programming model.

I  like the Concurrent Affairs implementation mostly because it implements lazy initialization of the wait handle already. Like most other developers, however, I can't resist the temptation to tweak the code, so I'll briefly explain some of the changes I've made:

  • I've changed the original version so you are required to derive from the AsyncResultNoResult and AsyncResult<TResult> classes to implement the asynchronous operation. I feel it is more intuitive to give the derived classes responsibility for managing the operation. Also, for classes with many different operations, it reduces the amount of code on the main class if you can move it to the AsyncResultNoResult derived class.
  • I've bullet proofed the implementation against strange use cases. For example calling BeginConnect on one Socket class and EndConnect on another. Also calling BeginConnect followed by EndWrite. Although unlikely to happen, it is friendlier to users of a public API that you detect these cases and not let them ship with bugs in their code.
  • I've added extensibility points such as Completing, MakeCallback, and Completed methods on the class. When implementing complex operations it is sometimes necessary to add some code that runs before or after the callback is made to the calling code. In addition, MakeCallback can be customized to change the thread on which the callback is called.
  • To allow extensibility when generating code, I marked the class partial so the class can be merged with generated code.
  • The base class contains methods to support the bullet proofing I described earlier.

Now that I've described the high level changes, let me toss out the implementation for you to look at:

internal partial class AsyncResultNoResult : IAsyncResult

{

    // Fields set at construction which never change while

    // operation is pending

    private readonly AsyncCallback m_AsyncCallback;

    private readonly Object m_AsyncState;

    // Fields set at construction which do change after

    // operation completes

    private const Int32 c_StatePending = 0;

    private const Int32 c_StateCompletedSynchronously = 1;

    private const Int32 c_StateCompletedAsynchronously = 2;

    private Int32 m_CompletedState = c_StatePending;

    // Field that may or may not get set depending on usage

    private ManualResetEvent m_AsyncWaitHandle;

    // Fields set when operation completes

  private Exception m_exception;

    /// <summary>

    /// The object which started the operation.

    /// </summary>

    private object m_owner;

    /// <summary>

    /// Used to verify BeginXXX and EndXXX calls match.

    /// </summary>

    private string m_operationId;

    protected AsyncResultNoResult(

        AsyncCallback asyncCallback,

        object state,

        object owner,

        string operationId)

    {

        m_AsyncCallback = asyncCallback;

        m_AsyncState = state;

        m_owner = owner;

        m_operationId =

            String.IsNullOrEmpty(operationId) ? String.Empty : operationId;

    }

    internal virtual void Process()

    {

        // Starts processing of the operation.

    }

    protected bool Complete(Exception exception)

    {

        return this.Complete(exception, false /*completedSynchronously*/);

    }

    protected bool Complete(Exception exception, bool completedSynchronously)

    {

        bool result = false;

        // The m_CompletedState field MUST be set prior calling the callback

        Int32 prevState = Interlocked.Exchange(ref m_CompletedState,

            completedSynchronously ? c_StateCompletedSynchronously :

            c_StateCompletedAsynchronously);

        if (prevState == c_StatePending)

        {

            // Passing null for exception means no error occurred.

            // This is the common case

            m_exception = exception;

            // Do any processing before completion.

            this.Completing(exception, completedSynchronously);

            // If the event exists, set it

            if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();

            this.MakeCallback(m_AsyncCallback, this);

            // Do any final processing after completion

            this.Completed(exception, completedSynchronously);

            result = true;

        }

        return result;

    }

    private void CheckUsage(object owner, string operationId)

    {

        if (!object.ReferenceEquals(owner, m_owner))

        {

            throw new InvalidOperationException(

                "End was called on a different object than Begin.");

        }

        // Reuse the operation ID to detect multiple calls to end.

        if (object.ReferenceEquals(null, m_operationId))

        {

            throw new InvalidOperationException(

                "End was called multiple times for this operation.");

        }

        if (!String.Equals(operationId, m_operationId))

        {

            throw new ArgumentException(

                "End operation type was different than Begin.");

        }

        // Mark that End was already called.

        m_operationId = null;

    }

    public static void End(

        IAsyncResult result, object owner, string operationId)

    {

        AsyncResultNoResult asyncResult = result as AsyncResultNoResult;

        if (asyncResult == null)

        {

            throw new ArgumentException(

                "Result passed represents an operation not supported " +

                "by this framework.",

                "result");

        }

        asyncResult.CheckUsage(owner, operationId);

        // This method assumes that only 1 thread calls EndInvoke

        // for this object

        if (!asyncResult.IsCompleted)

        {

            // If the operation isn't done, wait for it

            asyncResult.AsyncWaitHandle.WaitOne();

            asyncResult.AsyncWaitHandle.Close();

            asyncResult.m_AsyncWaitHandle = null; // Allow early GC

        }

        // Operation is done: if an exception occurred, throw it

        if (asyncResult.m_exception != null) throw asyncResult.m_exception;

    }

    #region Implementation of IAsyncResult

    public Object AsyncState { get { return m_AsyncState; } }

    public bool CompletedSynchronously

    {

        get

        {

            return Thread.VolatileRead(ref m_CompletedState) ==

                c_StateCompletedSynchronously;

        }

    }

    public WaitHandle AsyncWaitHandle

    {

        get

        {

            if (m_AsyncWaitHandle == null)

            {

                bool done = IsCompleted;

                ManualResetEvent mre = new ManualResetEvent(done);

                if (Interlocked.CompareExchange(ref m_AsyncWaitHandle,

                    mre, null) != null)

                {

                    // Another thread created this object's event; dispose

                    // the event we just created

                    mre.Close();

                }

                else

                {

                 if (!done && IsCompleted)

                    {

                        // If the operation wasn't done when we created

                        // the event but now it is done, set the event

                        m_AsyncWaitHandle.Set();

            }

                }

            }

            return m_AsyncWaitHandle;

        }

    }

    public bool IsCompleted

    {

        get

        {

            return Thread.VolatileRead(ref m_CompletedState) !=

                c_StatePending;

        }

    }

    #endregion

    #region Extensibility

    protected virtual void Completing(

        Exception exception, bool completedSynchronously)

    {

    }

    protected virtual void MakeCallback(

        AsyncCallback callback, AsyncResultNoResult result)

    {

        // If a callback method was set, call it

        if (callback != null)

        {

            callback(result);

        }

    }

    protected virtual void Completed(

        Exception exception, bool completedSynchronously)

    {

    }

    #endregion

}

The AsyncResult<TResult> class used for asynchronous operations that return results is shown below:

internal partial class AsyncResult<TResult> : AsyncResultNoResult

{

    // Field set when operation completes

    private TResult m_result = default(TResult);

    protected void SetResult(TResult result)

    {

        m_result = result;

    }

    protected AsyncResult(

        AsyncCallback asyncCallback,

        object state,

        object owner,

        string operationId) :

        base(asyncCallback, state, owner, operationId)

    {

    }

    new public static TResult End(

        IAsyncResult result, object owner, string operationId)

    {

        AsyncResult<TResult> asyncResult = result as AsyncResult<TResult>;

        Debug.Assert(asyncResult != null);

        // Wait until operation has completed

        AsyncResultNoResult.End(result, owner, operationId);

        // Return the result (if above didn't throw)

        return asyncResult.m_result;

    }

}

 

Implementing an operation

Given a class derived from AsyncResultNoResult, a class may implement an asynchronous operation using the following pattern:

public class MyClass

{

    public IAsyncResult BeginSend(

        string host,

        int port,

        byte[] buffer,

        int offset,

        int size,

        AsyncCallback asyncCallback,

        object state)

    {

        // TODO: Validate arguments

        SendAsyncResult result = new SendAsyncResult(

            host,

            port,

            buffer,

            offset,

            size,

            asyncCallback,

            state,

            this /*owner*/,

            "send");

        result.Process();

        return result;

    }

    public void EndSend(IAsyncResult result)

    {

        AsyncResultNoResult.End(result, this, "send");

    }

}

The SendAsyncResult would aggregate three asynchronous steps to accomplish sending the data to the host:

  • Connect
  • Send the data
  • Disconnect

A failure in either of the first two steps would cause the whole operation to fail. I won't give the implementation of SendAsyncResult in this post. In later posts, we will be generating code to implement this pattern based on a model we create. I will give a peek at one pattern used in implementing SendAsyncResult that is pretty tedious though. That is the code to handle exceptions.

Handling Exceptions

One tricky part of implementing classes derived from AsyncResultNoResult is the handling of exceptions.  When calling BeginXXX on a public API, most callers will expect the callback to be called in all cases to report the results. To ensure this, it requires some careful handling of exceptions.  The .Net framework guidelines still strongly recommend not catching the base exception Exception and only catching the exceptions you expect to handle.  There are justifications for this, but in production, it is not always feasible for software to just crash.  The following pattern is a compromise between the two views:

private void ConnectCompleted(IAsyncResult result)

{

  bool unhandled = true;

    try

    {

        m_socket.EndConnect(result);

        m_socket.BeginSend(

            m_buffer,

            0 /*offset*/,

            m_buffer.Length,

            SocketFlags.None,

            this.SendCompleted,

            null /*state*/);

        unhandled = false;

    }

    catch (SocketException exception)

    {

        unhandled = false;

        this.Complete(

            exception,

            false /*completedSynchronously*/);

    }

    finally

    {

        if (unhandled)

        {

            this.Complete(

                new MyFailureException(

                    "Unhandled exception detected."));

        }

    }

}

 

The above code snippet is the callback for the first step (connecting) and the start of the second step (sending the data). It catches the expected exceptions, but it also detects if some unexpected exception has been raised, and ensures that the callback is called with failure. How to handle exceptions in this case is a very hot topic, and every person will have a different opinion.  I hope that you can see that once you decide what exceptions handling technique you prefer, that there is an opportunity for tool support to make things easier on you.

Summary

In this post I gave a very brief description of an IAsyncResult implementation.  If you have done a lot of work with implementing asynchronous operations, you may know how tedious it is to implement them.  Since there is no tool support, most of the implementation is covered by team coding guidelines. In subsequent posts, I hope to show how we can create a model and generate most of the code necessary to implement this pattern.  How to implement the asynchronous operations and how to handle exceptions are hot topics that each group needs to work through on their own. 

Series

Start of series previous next

20110314_ImplementingAPM.zip