Useful Abstractions Enabled with ContinueWith

Stephen Toub - MSFT

In the June 2008 CTP of Parallel Extensions to the .NET Framework, we introduced the ContinueWith method on both Task and Future<T>.  ContinueWith is, in effect, a callback, very much like events in .NET.  With events, a causal action results in the event being raised, which by default triggers all of the delegates registered with the event to be invoked.  ContinueWith supports this, but rather than just registering the callback, it also provides back a Task or Future<T> that represents the callback; that returned Task or Future<T>’s lifecycle is tied specifically to the callback and won’t be marked as IsCompleted until that callback completes.  While at its core this could be considered a relatively simple idea and is useful for general dataflow in applications, it also enables some important patterns and can serve as the building block for a whole host of larger abstractions.

For example, the June 2008 CTP doesn’t provide support for mulit-item continuations (is this something you think we should provide in a future release?): you can create a continuation Task or Future<T> for when one Task or Future<T> finishes, but out-of-the-box you can’t create one for when all of multiple Task or Future<T>’s complete.  Nevertheless, you can use the basic ContinueWith support to implement this additional support.  Here’s an example of a ContinueWith method you could implement, where you provide it with multiple Task instances, and the resulting Task will only be scheduled and executed when all of the provided tasks complete (I’ve ommitted parameter validation and the like to simplify the code):

static Task ContinueWhenAll(
    Action<Task[]> continuation, params Task[] tasks)
{
    var starter = Future<bool>.Create();
    var task = starter.ContinueWith(o => continuation(tasks));

    CountdownEvent ce = new CountdownEvent(tasks.Length);
    Action<Task> whenComplete = delegate {
        if (ce.Decrement()) starter.Value = true;
    };
    foreach (var t in tasks)
        t.ContinueWith(whenComplete, TaskContinuationKind.OnAny,
            TaskCreationOptions.None, true);

    return task;
}

This ContinueWhenAll method tasks two parameters: an Action<Task[]> to execute when all of the provided tasks have completed (supplied with those tasks), and the params array of the Tasks to monitor for completion.  It then uses a workaround to create a Task that can be started at an arbitrary time in the future (in the June 2008 CTP, we don’t directly support this capability, hence the workaround, but we’re planning to in a future release as that capability is useful for a bunch of scenarios, including this one).  A delegate is created that will count down from the number of provided tasks every time it’s invoked, and when the count reaches 0, it will start the task.  ContinueWith is then used to register that whenComplete action as a continuation for all of the parameter tasks.  And voila, we now have a ContinueWhenAll method, which can be used like:

Task t1 = …, t2 = …;
Task t3 = ContinueWhenAll(
    delegate { Console.WriteLine(“t1 and t2 finished”); }, t1, t2);

Similarly, a method could be implemented for creating a task continuation for when any of a set of tasks completes (rather than when they all complete):

static Task ContinueWhenAny(
    Action<Task> continuation, params Task[] tasks)
{
    WriteOnce<Task> theCompletedTask = new WriteOnce<Task>();
    var starter = Future<bool>.Create();
    var task = starter.ContinueWith(o =>
        continuation(theCompletedTask.Value));

    Action<Task> whenComplete = t => {
        if (theCompletedTask.TrySetValue(t)) starter.Value = true;
    };
    foreach (var t in tasks)
         t.ContinueWith(whenComplete, TaskContinuationKind.OnAny,
            TaskCreationOptions.None, true);

    return task;
}

This implementation is very similar to ContinueWhenAll.  However, the action that’s registered as the continuation for each task uses a WriteOnce<T> to store the first Task to complete and to schedule the continuation task when it does.  ContinueWhenAny can be used like:

Task t1 = …, t2 = …;
Task t3 = ContinueWhenAny(
    delegate { Console.WriteLine(“t1 or t2 finished”); }, t1, t2);

There are other neat patterns enabled with ContinueWith.  One pattern that’s starting to become popular with frameworks like the CCR or AsyncEnumerator is to take advantage of C#’s iterator support to make writing code that uses asynchronous operations a bit more like sequential code.  While not the primary focus of the Task Parallel Library, ContinueWith can be used to implement such patterns in at least a limited capacity.  Consider the following method:

static void RunAsync(IEnumerable<Task> iterator)
{
    var enumerator = iterator.GetEnumerator();
    Action a = null;
    a = delegate
    {
        if (enumerator.MoveNext())
        { 
            enumerator.Current.ContinueWith(delegate { a(); });
        }
        else enumerator.Dispose();
    };
    a();
}

This method accepts an IEnumerable<Task>.  It retrieves an enumerator from the enumerable and uses that enumerator in a delegate.  The delegate moves the enumerator to its next element, and if there is a next element, retrieves it and uses ContinueWith to schedule the delegate (recursively, in a sense) for execution when that current Task completes.  When the enumerator reaches the end, it’s disposed.  With that delegate created, RunAsync simply executes the delegate to get the execution started.

Now imagine I wanted a method that would asynchronously read a file, processing it as it’s read in.  I can implement such a method as follows:

static IEnumerable<Task> ReadFile(string path)
{
    using (FileStream fs = new FileStream(
         path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
    {
        var enc = new UTF8Encoding();
        byte[] data = new byte[0x1000];
        while (true)
        {
            var pendingRead = CreateFutureFromApm<int>(ac =>
                fs.BeginRead(data, 0, data.Length, ac, null), fs.EndRead);
            yield return pendingRead;
            int bytesRead = pendingRead.Value;
            if (bytesRead <= 0) break;

            Console.WriteLine(enc.GetString(data, 0, bytesRead));
        }
    }
}

ReadFile opens a file stream and continually reads from it asynchronously.  The code wraps usage of the FileStream’s BeginRead/EndRead methods with a Future<int> (using the example method from https://blogs.msdn.com/8272833.aspx); that Future<int> is yielded from the ReadFile method (which returns an enumerator).  We’ll execute ReadFile by passing it to RunAsync.  When the Future<int> completes, its continuation will be executed, which will cause the RunAsync iterator to MoveNext, thus ending up back in the ReadFile method just after the yield location.  The ContinueWhenAll method can be used to allow more complicated asynchronous methods to be written, by wrapping multiple asynchronous invocations and yielding a continuation for when they all complete.

There are a myriad of such interesting patterns that ContinueWith enables.  We’d love to hear about any useful ones you come up with.

0 comments

Discussion is closed.

Feedback usabilla icon