Using C# 2.0 iterators to simplify writing asynchronous code (part 2)

Previous article describes the idea of using C# 2.0 iterators to write asynchronous code, now it's time to implement the utility class that "runs" the iterator. The utility turns out to be very light, I'm glad some readers reported they've already implemented their own version.

Lets list requirements for the utility:

  1. The user code should run sequentially. The code can "jump" between threads, but the main benefit of the approach is to let user write sequential code.
  2. The iterator should only resume when all the asynchronous operations finished, and previous moveNext() has yield the result.
  3. The utility should provide centralized exception handling.
  4. If the iterator function has finally clause, C# converts it to Dispose() method of IDisposable interface. We need to call it at the end of iteration.

I've played with different ways to use the utility, and settled on the following pattern:

AsyncEnumerator ae = new AsyncEnumerator();
ae.Start(UserFunction(/*user params*/..., ae));

The AsyncEnumerator class exposes following public API:

public sealed class AsyncEnumerator
{
public AsyncEnumerator() {}

    // The user-supplied exception handler,
// if it returns true the enumeration stops.
public Predicate<Exception> Catch
{
get { }
set { }
}

// User should never call this delegate directly, but
// should pass this callback to any async API it calls.
public AsyncCallback Callback
{
get { }
}

    public void Start(IEnumerable<Int32> enumerable) {}
}

Now how do we run the user code sequentially, and only resume it after all async operations have finished? The first iteration is invoked directly (i.e. we simply call enumerator.MoveNext, but to make the Start return immediately, we do it on thread pool thread). To decide when to invoke following iterations, let's keep a counter of outstanding async operations (this value it returned by user's yield return statement and is accessible by enumerator.Current). When an async operation finishes and our callback is called, we decrement this counter by 1 (of course, we should use interlocked operations since callbacks may get called in parallel). Once the counter reaches 0, all the outstanding async operations have completed and we should resume the iterator (i.e. call enumerator.MoveNext again).

The only tricky part is that callback(s) may get called before the iterator returns from MoveNext() call - this is why I've added the requirement that iterator can only be resumed after moveNext() has returned. The callback might even be called on the same thread that called the asynchronous method - if there is enough information to complete this call synchronously. The way we handle it is to allow the number of outstanding async operations be negative - we start with 0; each async callback decrements the count by 1; and when iterator.MoveNext() returns we increment it by iterator.Current. If after any of these operations the number of outstanding async operations again becomes 0, we know that all async operations have finished and the iterator code has exited with 'yield return', so it is safe to advance the iterator again: 

Thus there is a race between two threads that may resume iterator: the original thread that called previous iterator.MoveNext() and the thread that reported end of the last async operation. But it's OK, since our contract allows iterator code to migrate between threads, we only guarantee that a single thread runs it at a time.

public sealed class AsyncEnumerator
{
private IEnumerator<Int32> m_enumerator;
private Int32 m_PendingAsyncOps = 0;

    public void Start(IEnumerable<Int32> enumerable)
{
m_enumerator = enumerable.GetEnumerator();

// let's start the enumeration asynchronously too
ThreadPool.QueueUserWorkItem(delegate { Advance(); });
}

    // This is first implementation of Advance, to be revised below.
private void Advance()
{
while(true)
{
if (!m_enumerator.MoveNext())
break;

// The enumerator returns the number of async ops it initiated
// Add this to m_PendingAsyncOps
int pendingAsyncOps = Interlocked.Add(ref m_PendingAsyncOps, m_enumerator.Current);

if (pendingAsyncOps != 0)
                break; // quit this thread, let other thread resume iteration

/* No pending Async Ops, continue enumeration */
}
}

// The delegate for this function is passed to all async calls that
// user makes and thus it is automatically called by async API
// when an async operation finishes.
// It is never invoked by our code or directly by user.
private void Advance(IAsyncResult ar)
{
// An async op completed, subtract 1 from m_PendingAsyncOps
Int32 pendingAsyncOps = Interlocked.Decrement(ref m_PendingAsyncOps);
if (m_PendingAsyncOps == 0)
{
// The last pending async op completed, continue enumeration
Advance();
}
}
}

Finally, last requirements: exception hanlding and calling Dispose() method at the end of iteration. Both changes only affect void Advance() function that works with iterator - we need to catch the exception and call the user-provider handler, and at the end of iteration call Dispose():

    private void Advance()
{
bool end = true; // Assume that we'll stop the iteration

while(true)
{
try
{
end = !m_enumerator.MoveNext();
}
catch (Exception e)
{
// An exception occurred, catch it and execute the callback.
// The callback returns true to end the iteration or false to continue it
end = (m_catch != null) ? m_catch(e) : true;
}
if (end)
break;

// The enumerator returns the number of async ops it initiated
// Add this to m_PendingAsyncOps
int pendingAsyncOps = Interlocked.Add(ref m_PendingAsyncOps, m_enumerator.Current);

if (pendingAsyncOps != 0)
break; // quit this thread, let other thread resume iteration

/* No pending Async Ops, continue enumeration */
}

if (end)
{
// If nothing else to do, execute iterator's finally code
IDisposable d = m_enumerator as IDisposable;
if (d != null) d.Dispose();
}
}

Now the only missing piece is the public API. We have public Callback property that returns AsyncCallback delegate - we just return the second void Advance(IAsyncResult ar) function, but to avoid creating new delegate for each async call, lets cache it. And exception handler is just a delegate that user can provide:

public sealed class AsyncEnumerator
{
private Predicate<Exception> m_catch;

// Delegate for passing to async methods
// (to avoid creating new delegate for each call).
private AsyncCallback callback;

public AsyncEnumerator()
{
this.callback = Advance;
}

/// <summary>
/// The user-supplied exception handler, should return true to stop enumerator
/// </summary>
public Predicate<Exception> Catch
{
get { return m_catch; }
set { m_catch = value; }
}

/// <summary>
/// The callback that user code should pass to all async APIs
/// </summary>
public AsyncCallback Callback
{
get { return this.callback; }
}
}

This is all for today. For next blog entry, I plan to do performance measurements, and compare code that uses this utility with code that uses threads and synchronous calls. But I've not done this yet, so you are welcome to post your results!