Sequences and StreamInsight: Initiating a Computation

By Colin Meek, Alex Raizman, and Rafael Fernandez Moctezuma

When bridging between temporal streams (CepStream), pull-sequences (IEnumerable), and push sequences (IObservable) using the StreamInsight sequence integration feature, handling of computation lifetime is automatic: consuming results from a query generally kicks off upstream queries*, and disposing or deleting releases all upstream resources as well. While most of the time the policies governing lifetimes “just work”, the resulting behaviors sometimes confuse StreamInsight developers. Consider the following example:

var source = Enumerable.Range(0, 1024);
int count = 0;
using (source.ToObservable().Subscribe(_ => count++))
{
    Console.WriteLine(count); // 1024

Example 1: Pull-to-push adapter

The call to Subscribe() in this example returns after all elements of the source sequence have been consumed. 

Consider the following example that bridges from a push-sequence to a temporal stream:

int count = 0;
var source = Observable.Range(0, 1024).Do(_ => count++);
var startTime = new DateTimeOffset(new DateTime(2010, 1, 1), TimeSpan.Zero);
var stream = source.ToPointStream(Application, i => PointEvent.CreateInsert(startTime.AddMinutes(i), i));
var query = stream.ToQuery("MyQuery", null, EventShape.Point, StreamEventOrder.FullyOrdered);
query.Start();
Console.WriteLine(count); // 1024

Example 2: Observable range generator and push-to-stream adapter

Similar to the previous example, you’ll notice that the call to query.Start() does not return until all of the input has been consumed.

The following calls are used to initiate a computation in StreamInsight: Query.Start() , IEnumerable<>.GetEnumerator() , and IObservable<>.Subscribe() . As the examples 1 and 2 suggest, we’re particularly interested in when these calls return. First, we need to understand that the Reactive Framework embraces the “least concurrency” principle in its design, which can be roughly summarized as “don’t introduce concurrency if you don’t need to”. The MSDN article on “Using Schedulers” gives additional background; you may also consult the Rx Workshop: Schedulers on Channel 9 for an in-depth look.

What does this principle imply about the above examples?

Let’s first examine the ToObservable() method. By default, it uses the current thread scheduler so the work of consuming the source iterator happens on the “subscribe” thread! By the time the subscribe call returns, all 1024 source elements have already been processed.

What is the effect of the temporal query sandwiched between the observable source and published stream output in example 2?

When query.Start()  or another ‘initiating’ method like GetEnumerator()  or Subscribe()  is called, the method doesn’t return until all inputs to the query have been initiated. This behavior allows the caller to know when a temporal query is primed to accept input. It also means that -- depending on the nature of the sources feeding your query -- you may need to wait a long time for the call to return!

How can you prevent generators like Observable.Range()  and Observable. ToObservable() from blocking initiation requests?

We can manually introduce the desired concurrency by passing in a different scheduler! For instance, we could modify example 1 so that events are enqueued on thread pool threads rather than the current thread:

using (source.ToObservable(Scheduler.ThreadPool).Subscribe(_ => count++))
{
    Console.WriteLine(count); // ~0
}

Example 3: Introducing concurrency using a non-default scheduler

Let’s look at an example where the call to Subscribe()  can never return because the source “ Never() ” calls OnCompleted or OnError:

using (Observable.Never<int>().ToEnumerable().ToObservable().Subscribe())
{
    Console.WriteLine("Inconceivable!");
}

Example 4: Push-to-pull-to-push

Fortunately, the above example isn’t terribly interesting! Just keep in mind that xs.ToEnumerable().ToObservable() is not equivalent to xs:

using (Observable.Never<int>().Subscribe())
{
    Console.WriteLine("Conceivable!");

Example 5: Absent the push-pull-push bridges, subscribe no longer blocks

Apart from modifying the behavior of Subscribe() ,the intervening buffer used by ToEnumerable() also makes calls to On* asynchronous.

In the pull-sequence space, propagation of initiation requests looks a little bit different. Because LINQ to Objects mostly follows yield return semantics, calls to GetEnumerator() do not in fact propagate immediately! It is not until the first call to MoveNext() on the iterator object that the compiler-generated iterator state machine advances beyond the GetEnumerator() calls for inputs. This behavior contrasts with those of Reactive and StreamInsight operators whose inputs are (generally) spun up immediately. There’s even an exception to the exception: for LINQ to SQL and LINQ to Entities queries, calls to GetEnumerator() immediately result in ExecuteReader() calls. Our hope is that the exact phase of the initiation is not as important for pull-based sources as it is for potentially hot push-based sources. In the latter case, it may be important to ensure that a downstream consumer is active before the upstream producer begins generating output**.

 Let me know if you’d like to see a follow-up post on end-of-life concerns for computations in StreamInsight (auto-detach, synchronization, etc.)!

* Initiation requests do not propagate automatically across published stream and subject boundaries.
** Hot and cold, see Wes’ talk on Channel 9: Rx API in depth: Hot and Cold observables.