Using C# 2.0 iterators to simplify writing asynchronous code (part 2)


Previous article describes the idea of using C# 2.0 iterators to write asynchronous code, now it’s time to implement the utility class that “runs” the iterator. The utility turns out to be very light, I’m glad some readers reported they’ve already implemented their own version.


Lets list requirements for the utility:



  1. The user code should run sequentially. The code can “jump” between threads, but the main benefit of the approach is to let user write sequential code.

  2. The iterator should only resume when all the asynchronous operations finished, and previous moveNext() has yield the result.

  3. The utility should provide centralized exception handling.

  4. If the iterator function has finally clause, C# converts it to Dispose() method of IDisposable interface. We need to call it at the end of iteration.

I’ve played with different ways to use the utility, and settled on the following pattern:


AsyncEnumerator ae = new AsyncEnumerator();
ae.Start(UserFunction(/*user params*/…, ae));


The AsyncEnumerator class exposes following public API:


public sealed class AsyncEnumerator
{
    public AsyncEnumerator() {}

    // The user-supplied exception handler,
    // if it returns true the enumeration stops.

    public Predicate<Exception> Catch
    {
        get { }
        set { }
    }

    // User should never call this delegate directly, but
    //
should pass this callback to any async API it calls.
    public
AsyncCallback Callback
    {
        get { }
    }

    public void Start(IEnumerable<Int32> enumerable) {}
}


Now how do we run the user code sequentially, and only resume it after all async operations have finished? The first iteration is invoked directly (i.e. we simply call enumerator.MoveNext, but to make the Start return immediately, we do it on thread pool thread). To decide when to invoke following iterations, let’s keep a counter of outstanding async operations (this value it returned by user’s yield return statement and is accessible by enumerator.Current). When an async operation finishes and our callback is called, we decrement this counter by 1 (of course, we should use interlocked operations since callbacks may get called in parallel). Once the counter reaches 0, all the outstanding async operations have completed and we should resume the iterator (i.e. call enumerator.MoveNext again).


The only tricky part is that callback(s) may get called before the iterator returns from MoveNext() call – this is why I’ve added the requirement that iterator can only be resumed after moveNext() has returned. The callback might even be called on the same thread that called the asynchronous method – if there is enough information to complete this call synchronously. The way we handle it is to allow the number of outstanding async operations be negative – we start with 0; each async callback decrements the count by 1; and when iterator.MoveNext() returns we increment it by iterator.Current. If after any of these operations the number of outstanding async operations again becomes 0, we know that all async operations have finished and the iterator code has exited with ‘yield return’, so it is safe to advance the iterator again: 


Thus there is a race between two threads that may resume iterator: the original thread that called previous iterator.MoveNext() and the thread that reported end of the last async operation. But it’s OK, since our contract allows iterator code to migrate between threads, we only guarantee that a single thread runs it at a time.


public sealed class AsyncEnumerator
{
    private IEnumerator<Int32> m_enumerator;
    private Int32 m_PendingAsyncOps = 0;

    public void Start(IEnumerable<Int32> enumerable)
    {
        m_enumerator = enumerable.GetEnumerator();

        // let’s start the enumeration asynchronously too
        ThreadPool.QueueUserWorkItem(delegate { Advance(); });
    }

    // This is first implementation of Advance, to be revised below.
    private void Advance()
    {
        while(true)
        {
            if (!m_enumerator.MoveNext())
                break;


            // The enumerator returns the number of async ops it initiated
           
// Add this to m_PendingAsyncOps
            int pendingAsyncOps = Interlocked.Add(ref m_PendingAsyncOps, m_enumerator.Current);

            if (pendingAsyncOps != 0)
               
break; // quit this thread, let other thread resume iteration

            /* No pending Async Ops, continue enumeration */
        }
    }

    // The delegate for this function is passed to all async calls that
    // user makes and thus it is automatically called by async API
    // when an async operation finishes.
    // It is never invoked by our code or directly by user.

    private void Advance(IAsyncResult ar)
    {
        // An async op completed, subtract 1 from m_PendingAsyncOps
        Int32 pendingAsyncOps = Interlocked.Decrement(ref m_PendingAsyncOps);
        if (m_PendingAsyncOps == 0)
        {
            // The last pending async op completed, continue enumeration
            Advance();
        }
    }
}


Finally, last requirements: exception hanlding and calling Dispose() method at the end of iteration. Both changes only affect void Advance() function that works with iterator – we need to catch the exception and call the user-provider handler, and at the end of iteration call Dispose():


    private void Advance()
    {
        bool end = true// Assume that we’ll stop the iteration

        while(true)
        {
            try
            {
                end = !m_enumerator.MoveNext();
            }
            catch (Exception e)
            {
                // An exception occurred, catch it and execute the callback.
               
// The callback returns true to end the iteration or false to continue it
                end = (m_catch != null) ? m_catch(e) : true;
            }
            if (end)
                break;

            // The enumerator returns the number of async ops it initiated
           
// Add this to m_PendingAsyncOps
            int pendingAsyncOps = Interlocked.Add(ref m_PendingAsyncOps, m_enumerator.Current);

            if (pendingAsyncOps != 0)
                break; // quit this thread, let other thread resume iteration

            /* No pending Async Ops, continue enumeration */
        }

        if (end)
        {
            // If nothing else to do, execute iterator’s finally code
            IDisposable d = m_enumerator as IDisposable;
            if (d != null) d.Dispose();
        }
    }


Now the only missing piece is the public API. We have public Callback property that returns AsyncCallback delegate – we just return the second void Advance(IAsyncResult ar) function, but to avoid creating new delegate for each async call, lets cache it. And exception handler is just a delegate that user can provide:


public sealed class AsyncEnumerator
{
    private Predicate<Exception> m_catch;

    // Delegate for passing to async methods
   
// (to avoid creating new delegate for each call).
    private AsyncCallback callback;

    public AsyncEnumerator()
    {
        this.callback = Advance;
    }

    /// <summary>
    /// The user-supplied exception handler, should return true to stop enumerator
    /// </summary>
    public Predicate<Exception> Catch
    {
        get { return m_catch; }
        set { m_catch = value; }
    }

    /// <summary>
    /// The callback that user code should pass to all async APIs
    /// </summary>
    public AsyncCallback Callback
    {
        get { return this.callback; }
    }
}


This is all for today. For next blog entry, I plan to do performance measurements, and compare code that uses this utility with code that uses threads and synchronous calls. But I’ve not done this yet, so you are welcome to post your results!

Comments (8)

  1. Mihailik says:

    Michael, I think your implementation of this idea is a little over-complex.

    By the way, with Google and "Asynchronous iterators" query you can find:

    http://www.google.com/search?q=asynchronous+iterators

    1. My article on this idea: msmvps.com/blogs/79813.aspx

    2. Your first article: blogs.msdn.com/michen/archive/2006/03/30/564671.aspx

    3. Miguel de Icaza post about asynchronuos execution and iterators in Mono ASP.NET implementation.

    And you can find, Miguel’s post was first. And the Mono project is first place where this cool idea was used in C#. Strangely, but nobody detected that cool idea in Miguel’s blog.

    In my post (at Christmas) I had no feedback too.

    I think the idea is very competitive, but the implementation and framework around it should be a lot stronger and simpler to be widely used. May be such asynchronous "chained" operation is good place for improvement at language level in future C# versions.

  2. Very interesting, I should have googled before posting this 🙂

    Indeed, yours and Miguel de Icaza blogs describe almost the same usage as I described.

    I believe the synchronization logic is necessary though – C#-generated iterator is not thread-safe, and both implementations (at least as described in blogs) suffer from the problem solved in my article: they do not handle correctly the case when async operation finishes before or at the same time when the thread that started it produces the value with ‘yield return’. And this is not just theoretical problem – I’ve found this could happen with HttpWebRequest class.

    The complexity of implementation is arguable, but with some added complexity in the library, it lets one write simpler code: by centralizing exception handling, ensuring finally block in the iterator is called, and avoiding unnecessary anonymous delegates in user code – the user writes simple sequential code (this is also the case for Miguel’s version).

    But anyway it is the same idea, thanks for letting me know – it was very educating to look at other implementations! I’m adding your and Miguels blogs to my RSS agregator 🙂

  3. Mihailik says:

    Michael, I understan you very well 🙂

    Actually I found previous Miguel’s implementation AFTER my posting too. It was so sad at first time!

    About threading issues.

    Actually, my implementation is not fragile by race conditions. Let me show why.

    Look here, it is asynchronous algorithm of Socket.BeginAccept/EndAccept:

    http://blogs.gotdotnet.ru/personal/mihailik/PermaLink.aspx?guid=a9557a71-8e52-48ef-b867-d0f78e636e77

    The core is iterator:

    private IEnumerator<Invoke> AsyncListen(Action<Socket> yieldSocket, Invoke next)

    {

       while( true )

       {

           IAsyncResult acceptCompleted = null;

           yield return delegate

           {

               acceptCompleted = tcpListener.BeginAcceptSocket(

                   delegate { next(); },

                   null);

           };

           Socket acceptedSocket = tcpListener.EndAcceptSocket(acceptCompleted);

           yieldSocket(acceptedSocket);

       }

    }

    Here you can see that it returns not IEnumerable "of int", but "of Invoke", and Invoke is actually delegate.

    So, the things going more complex under the cover, but synctactic sugar hides this. Here is different parts of algorithm:

    1. Preparing to Socket.BeginAccept

    2. Calling Socket.BeginAccept

    3. Stepping by IEnumerator.MoveNext

    4. Calling Socket.EndAccept

    Actually iterator code contains only (1) and (4).

    BeginAccept (3) is not called from iterator code, but it is called from surround "framework" code. As you can see, iterator yields asynchronous delegate with BeginAccept call inside it. Iterator does not call BeginAccept, it is just delegate this call to surround "framework".

    So, when iterator’s code is stopped by yield, BeginInvoke is yet not called. It means iterator NEVER can execute next step BEFORE finishing priveous.

    * * *

    Also I have some ideas for C# language features for this problem. It may be automatic BeginOperation/EndOperation implementation, based on Operation() method.

    Compiler may detect the places for "yield-step-points" inside hand-written Operation() method. For example:

    ParsedRequest GetRequest(Socket socket)

    {

       …

       Socket.Read(…);

       …

       return …

    }

    The compiler may found calls to Socket.Read, and convert it to Socket.BeginRead/EndRead, splitted by yield-like-continuation.

    Syntactically it may be something like:

    async ParsedRequest GetRequest(Socket socket)

    and it will be compiled into three methods: actual GetRequest and pair of BeginGetRequest/EndGetRequest generated by compiler.

  4. Thanks, I now understand how you code works – yes, it looks thread-safe.

    ***

    About C# compiler doing the code transformation – this was exactly the same wish I’ve got too 🙂

    The second thought was that LISP with its (arguably evil, but powerful) macro facility would allow implementing this in a library, rather than modifying core language. Unfortunately, I don’t see a way to have LISP-like macro for a language with complex syntax like C#.

  5. jasons says:

    you both rock, my hat is off to both of you sirs!

    thank you for your conversation, i find it very interesting and compelling!

  6. John says:

    Thanks for a great article.  Most of the examples I’ve seen for using sockets don’t deal with the real life issues such as the one you mention with HttpWebRequest.  M(ost examples don’t deal with packet fragmentation either, simply assuming that any aribitrary number of bytes they read is the entire message and can be converted to a string with a single call to an encoding function.)  I appreciate the complication that you added to your code to make it more realistic.

Skip to main content