Processing tasks as they complete

Recently I’ve had several folks ask me about how to process the results of tasks as those tasks complete.

A developer will have multiple tasks representing asynchronous operations they’ve initiated, and they want to process the results of these tasks, e.g.

List<Task<T>> tasks = …;
foreach(var t in tasks) {
    try { Process(await t); }
    catch(OperationCanceledException) {}
    catch(Exception exc) { Handle(exc); }
}

This approach is fine for many situations.  However, it enforces an additional constraint on the processing that wasn’t actually implied by the initial problem statement: this code ends up processing the tasks in the order they were supplied rather than in the order they complete, which means some tasks that have already completed might not be made available for processing, because an earlier task in the order may not have completed yet.

There are many ways to implement a solution like this.  One approach involves simply using Task’s ContinueWith method, e.g.

List<Task<T>> tasks = …;
foreach(var t in tasks)
    t.ContinueWith(completed => {
        switch(completed.Status) {
            case TaskStatus.RanToCompletion: Process(completed.Result); break;
            case TaskStatus.Faulted: Handle(completed.Exception.InnerException); break;
        }
    }, TaskScheduler.Default);

This solution actually alleviates an additional constraint, which is that the original solution forced the processing of all of the continuations to run serially (and on the original SynchronizationContext if there was one), whereas this allows the processing to run in parallel and on the ThreadPool.  If you wanted to go with this approach but also wanted to force the tasks to run serially, you could do so by supplying a serializing scheduler to ContinueWith (i.e. a scheduler that forced the continuations to run exclusively with regards to each other).  For example, you could use a ConcurrentExclusiveSchedulerPair, e.g.

List<Task<T>> tasks = …;
var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
foreach(var t in tasks)
    t.ContinueWith(completed => {
        switch(completed.Status) {
            case TaskStatus.RanToCompletion: Process(completed.Result); break;
            case TaskStatus.Faulted: Handle(completed.Exception.InnerException); break;
        }
    }, scheduler);

or if, for example, you were on the UI thread of your application, you could supply a scheduler that represents that UI thread, e.g.

var scheduler = TaskScheduler.FromCurrentSynchronizationContext();

Even so, this ContinueWith-based approach does force you into a callback-based model to handle your processing.  If you wanted the process-serially-as-the-tasks-complete behavior, but with using async/await rather than using ContinueWith, that’s also possible.

There are a few ways to achieve this.  One relatively simple way is by using Task.WhenAny.  WhenAny accepts a set of tasks and will asynchronously provide the first one that completes.  As such, you can repeatedly invoke WhenAny, each time removing the previously completed task such that you’re asynchronously waiting for the next to complete, e.g.

List<Task<T>> tasks = …;
while(tasks.Count > 0) {
    var t = await Task.WhenAny(tasks);
    tasks.Remove(t);

    try { Process(await t); }
    catch(OperationCanceledException) {}
    catch(Exception exc) { Handle(exc); }
}

Functionally, this is fine, and as long as the number of tasks is small, the performance of this should be fine, too.  However, if the number of tasks is large here, this could result in non-negligible performance overheads.  What we’ve effectively created here is an O(N2) algorithm: for each task, we search the list for the task to remove it, which is an O(N) operation, and we register a continuation with each task, which is also an O(N) operation.  For example, if we had 10,000 tasks, over the span of this whole operation we’d end up registering and unregistering upwards of 50 million continuations as part of the WhenAny calls.  Now, that’s not quite as bad as it sounds, as WhenAny is smart about how it manages its resources, for example not registering continuations with tasks that are already completed, stopping as soon as it finds a completed task, reusing the same continuation object across all of the tasks, etc.  Still, there’s work here we can avoid if profiling deems it to be problematic.

An alternate approach is to create a new “combinator” method specifically geared towards this purpose.  When working with a collection of Task<T> instances, WhenAny returns a Task<Task<T>>; this is a task that will complete when the first supplied task has completed, and that Task<Task<T>>’s Result will be that first completed task.  In our case, we don’t just want the first task, but rather we want all of the tasks, ordered in how they’ll complete.  We can represent this with a Task<Task<T>>[].  Think of this as an array of buckets where we’ll place the input tasks as they complete, one task per bucket.  So, if you want to get the first task to complete, you can await the first bucket of this array, and if you want to get the sixth task to complete, you can await the sixth bucket of this array.

public static Task<Task<T>> [] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToList();

    var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
    var results = new Task<Task<T>>[buckets.Length];
    for (int i = 0; i < buckets.Length; i++) 
    {
        buckets[i] = new TaskCompletionSource<Task<T>>();
        results[i] = buckets[i].Task;
    }

    int nextTaskIndex = -1;
    Action<Task<T>> continuation = completed =>
    {
        var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
        bucket.TrySetResult(completed);
    };

    foreach (var inputTask in inputTasks)
        inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    return results;
}

So what’s happening here?  First we force our enumerable of tasks into a List<Task<T>>; this is to ensure that any tasks that might be produced lazily by enumerating the enumerable are reified once.  Next, we create TaskCompletionSource<Task<T>> instances to represent the buckets, one bucket per each of the tasks that will eventually complete.  Then we hook up a continuation to each input task: this continuation will get the next available bucket and store the newly completed task into it.  With this combinator, we can now rewrite our original code as follows:

List<Task<T>> tasks = …;
foreach(var bucket in Interleaved(tasks)) {
    var t = await bucket;
    try { Process(await t); }
    catch(OperationCanceledException) {}
    catch(Exception exc) { Handle(exc); }
}

To close out this discussion, let’s look at an example of what this actually does in practice.  Consider the following:

var tasks = new[] { 
    Task.Delay(3000).ContinueWith(_ => 3),
    Task.Delay(1000).ContinueWith(_ => 1), 
    Task.Delay(2000).ContinueWith(_ => 2),
    Task.Delay(5000).ContinueWith(_ => 5),
    Task.Delay(4000).ContinueWith(_ => 4),
};
foreach (var bucket in Interleaved(tasks)) {
    var t = await bucket;
    int result = await t;

    Console.WriteLine(“{0}: {1}”, DateTime.Now, result);
}

Here we have an array of Task<int>, each of will complete after N seconds and return the integer N (e.g. the first task in the array will complete after 3 seconds and return the number 3).  We then loop through these tasks using our handy Interleaved method, printing out results as we get them.  When I run this code, I see the following output:

8/2/2012 7:37:48 AM: 1
8/2/2012 7:37:49 AM: 2
8/2/2012 7:37:50 AM: 3
8/2/2012 7:37:51 AM: 4
8/2/2012 7:37:52 AM: 5

and this is exactly the behavior we wanted.  Note the times that each element was output. All of the tasks were started at the same time, so their timers are all running concurrently.  As each task completes, our loop is able to process it, and as a result we get one line of output each second. 

In contrast, consider the same example but not using Interleaved:

var tasks = new[] { 
    Task.Delay(3000).ContinueWith(_ => 3),
    Task.Delay(1000).ContinueWith(_ => 1), 
    Task.Delay(2000).ContinueWith(_ => 2),
    Task.Delay(5000).ContinueWith(_ => 5),
    Task.Delay(4000).ContinueWith(_ => 4),
};
foreach (var t in tasks) {
    int result = await t;
    Console.WriteLine(“{0}: {1}”, DateTime.Now, result);
}

When I run this variation, I see:

8/2/2012 7:42:08 AM: 3
8/2/2012 7:42:08 AM: 1
8/2/2012 7:42:08 AM: 2
8/2/2012 7:42:10 AM: 5
8/2/2012 7:42:10 AM: 4

Now look at the times.  Because we’re now processing the tasks in order, we couldn’t print out the results for the 1s task or the 2s task until the 3s task had completed (since it was before them in the array).  Similarly, we couldn’t print out the result for the 4s task until the 5s task completed.