Waiting for Tasks

Parallel Extensions makes it easy to wait for Tasks to complete.  Task exposes a Wait method, which can be used trivially:

Task t = Task.Create(…);

t.Wait();

Task also exposes several static methods for waiting on an array of tasks, either for all of them to complete or for any of them to complete:

Task t1 = Task.Create(…);
Task t2 = Task.Create(…);
Task t3 = Task.Create(…);

Task.WaitAll(t1, t2, t3); // or Task.WaitAny(t1, t2, t3)

Moreover, Tasks have implicit parent/child relationships, and a parent Task will wait for any of its child tasks to complete before it completes:

Task p = Task.Create(delegate
{
    Task c1 = Task.Create(…);
    Task c2 = Task.Create(…);
    Task c3 = Task.Create(…);
});

p.Wait(); // will not wake up until c1, c2, and c3 have finished

Sometimes, however, you want to wait for a list of tasks to complete, but you don’t necessarily want to have to hold onto references to all of the relevant Task objets.  Imagine a scenario where I’m creating tasks for each of an unknown number of data elements to be processed:

IEnumerable<Data> data = …;
List<Task> tasks = new List<Task>();
foreach(var item in data) tasks.Add(Task.Create(delegate { Process(item); });
Task.WaitAll(tasks.ToArray());

The code is straightforward, but it also requires that I hold onto references to all of the Tasks I’ve created so that I can wait for all of them.  It’s interesting to note, however, that a task that has already completed doesn’t really need to be waited on.  As an example, if I create 10 tasks and 9 of them complete, waiting on the remaining 1 task is sufficient to know that all 10 completed.  This means that I only really need to wait on a count of tasks, where each new task increases the count, and where each task completing decreases the count: when the count reaches 0, all tasks have completed. 

Sound familiar? This is the domain of a CountdownEvent, which we can use to codify this approach:

IEnumerable<Data> data = …;
using(CountdownEvent tasksRemaining = new CountdownEvent(1))
{
    foreach(var item in data)
    {
        tasksRemaining.Increment();
        Task.Create(delegate
        {
            Process(item);
            taskRemaining.Decrement();
        });
    }
    tasksRemaining.Decrement(); // to counteract the initial 1
    tasksRemaining.Wait();
}

The CountdownEvent is initialized to a count of 1 to ensure that the event doesn’t become signaled before I’m done creating all of the tasks; I remove that 1 count when all tasks have been created.  Before creating each Task, I up the count by 1, and when each task finishes, I decrease the count by 1.  Then I just wait on the CountdownEvent, which will be signaled when all of the tasks have completed.

Of course, this requires that I incorporate such logic directly into my code, but there’s really no need for that.  I can extract out this logic into a separate class:

public class TaskWaiter
{
    public CountdownEvent _ce = new CountdownEvent(1);
    private bool _doneAdding = false;

    public void Add(Task t)
    {
        if (t == null) throw new ArgumentNullException(“t”); 
        _ce.Increment();
        t.ContinueWith(ct => _ce.Decrement());
    }

    public void Wait()
    {
        if (!_doneAdding) { _doneAdding = true; _ce.Decrement(); } 
        _ce.Wait();
    }
}

Rewriting the previous example is now almost identical, just substituting TaskWaiter for the List<Task>:

IEnumerable<Data> data = …;
TaskWaiter tasks = new TaskWaiter();
foreach(var item in data) tasks.Add(Task.Create(delegate { Process(item); });
tasks.Wait();

Unlike the previous implementation that uses a List<Task>, this implementation now doesn’t need to store references to any of the tasks.  For just a few tasks, it’s probably not warranted.  But if lots of tasks are being create and/or any of the tasks will be long-running such that holding onto tasks could cause them to be promoted to higher GC generations, this could have a noticeable impact on the application’s performance.

One neat thing to notice about the implementation of TaskWaiter: it uses ContinueWith.  I previously wrote about useful abstractions enabled with ContinueWith, and this is another such abstraction.  ContinueWith is used in order to get a bit of extra code to execute when a Task complete, such that the CountdownEvent is properly decremented.  This allows me to move that Decrement call out of the Task’s body itself (as I had with the original CountdownEvent-based implementation shown earlier), which means this technique can be used with any arbitrary Task you’re handed, rather than one you have to create yourself.

There are some downsides to the TaskWaiter approach, however.  The biggest that jumps to mind with the current implementation is that we lose the ability to propagate any exceptions that may have occurred from tasks tracked by TaskWaiter.  We can fix that by storing references to tasks that complete with unhandled exceptions, and then do a true wait only on those tasks:

public class TaskWaiter
{
    public CountdownEvent _ce = new CountdownEvent(1);
    private bool _doneAdding = false;
    private ConcurrentQueue<Task> _faulted =
        new ConcurrentQueue<Task>();

    public void Add(Task t)
    {
        if (t == null) throw new ArgumentNullException(“t”); 
        _ce.Increment();
        t.ContinueWith(ct =>
        { 
            if (ct.Exception != null) _faulted.Enqueue(ct);
            _ce.Decrement();
        });
    }

    public void Wait()
    {
        if (!_doneAdding) { _doneAdding = true; _ce.Decrement(); } 
        _ce.Wait(); 
        if (!_faulted.IsEmpty) Task.WaitAll(_faulted.ToArray());
    }
}

Notice the few changes.  A ConcurrentQueue<Task> was added to track any tasks that completed due to exception; this collection needs to be thread-safe, as multiple tasks could fail on different threads and attempt to access the queue from different threads, hence the ConcurrentQueue<Task>.  The delegate passed to ContinueWith has been augmented to check to see if the completing Task completed due to an exception, and if it did, the Task is added to the queue.  Then in Wait, after waiting on the CountdownEvent, I use Task.WaitAll to wait on all of the tasks that completed due to exceptions.  By this point, they’ve of course already completed, so I’m using Task.WaitAll simply to aggregate and throw all of the exceptions.

There’s a subtlety about how I made these changes that’s important to keep in mind.  Specifically, it’s crucial that the faulting task be added to the ConcurrentQueue<T> before the CountdownEvent is decremented; if those lines were reversed in the ContinueWith delegate, the CountdownEvent could become set before the Task was tracked in the queue, in which case the exceptions from that Task may be missed in the call to TaskWaiter.Wait, since the Task may not yet have made it into the queue by the time Task.WaitAll is called. 

There are, of course, other optimizations that could be made to an implementation like this one, but we’ll save those for another day.