ParallelExtensionsExtras Tour – #2 – Task.ToObservable

Stephen Toub - MSFT

(The full set of ParallelExtensionsExtras Tour posts is available here.)  

In our previous ParallelExtensionsExtras tour post, we discussed a custom implementation of the LINQ operators, in particular for working with Task<TResult> instances in an asynchronous manner. There is already an impressive implementation of the LINQ operators and more for working with asynchronous operations: Reactive Extensions (Rx).  Rx provides a full LINQ implementation based on the IObservable<T> interface that was added to the .NET Framework 4. As it turns out, while Task<TResult> does not currently implement IObservable<T>, it’s actually quite a good fit to do so.  An observable represents any number of values terminated by either an end of the stream or by an exception.  A Task<TResult> fits this description: it completes with either a single value or with an exception.  While it’s possible that a future version of the .NET Framework will see Task<TResult> implement IObservable<T>, we can get the relevant behavior now by implementing it as an extension method for Task<TResult>; in fact, Rx includes just such an extension method, as does ParallelExtensionsExtras. 

Here we’ll take a look at the implementation available in ParallelExtensionsExtras, as part of the TaskExtrasExtensions.cs file.  I’ve omitted parameter validation for the sake of conciseness.   With such an extension method, we can rewrite the LINQ query from our last post to instead be based on observables, e.g.

IObservable<string> result = from x in Task.Factory.StartNew(

                                 () => ProduceInt()).ToObservable()

                             from y in Task.Factory.StartNew(

                                 () => Process(x)).ToObservable()

                             select y.ToString();

 

First, we need our ToObservable extension method itself, which should accept a Task<TResult> and return an IObservable<TResult>.  To do that, we need a type that implements IObservable<TResult> and wraps the Task<TResult>, so that we can work with the Task<TResult> when Subscribe is called on the IObservable<TResult>.

public static IObservable<TResult> ToObservable<TResult>(

    this Task<TResult> task)

{

    return new TaskObservable<TResult> { _task = task };

}

 

Now we just need to implement TaskObservable<TResult>, which implements IObservable<TResult> and its one method: Subscribe.  When Subscribe is called to register an IObserver<T>, we’ll take advantage of ContinueWith to get a callback when the task completes.  If the task completed successfully, we’ll pass along its Result to the observer’s OnNext, and then notify the observer through OnCompleted that the observable will not be sending out any more values.  If the task failed, we’ll pass along its Exception the observer through its OnError method.  And if the task was canceled, we’ll pass along a TaskCanceledException (which derives from OperationCanceledException).

That’s the bulk of the implementation, shown here:

private sealed class TaskObservable<TResult> : IObservable<TResult>

{

    internal Task<TResult> _task;

 

    public IDisposable Subscribe(IObserver<TResult> observer)

    {

        var cts = new CancellationTokenSource();

 

        _task.ContinueWith(t =>

        {

            switch (t.Status)

            {

                case TaskStatus.RanToCompletion:

                    observer.OnNext(_task.Result);

                    observer.OnCompleted();

                    break;

 

                case TaskStatus.Faulted:

                    observer.OnError(_task.Exception);

                    break;

 

                case TaskStatus.Canceled:

                    observer.OnError(new TaskCanceledException(t));

                    break;

            }

        }, cts.Token);

 

        return new CancelOnDispose { Source = cts };

    }

}

 

There’s one piece I have not described, and that’s dealing with unsubscription.  The Subscribe call returns an IDisposable that can be used to cancel the observer’s subscription, effectively unsubscribing the observer.  We handle that with task cancellation.  The IDisposable we return from Subscribe is just a simple wrapper around a CancellationTokenSource, such that the source will have cancellation requested when the object is disposed (e.g. Dispose() { _source.Cancel(); }).  This source’s CancellationToken is provided to the ContinueWith method, such that if the subscription is canceled, so too is the continuation.

0 comments

Discussion is closed.

Feedback usabilla icon