StreamInsight 2.1, meet LINQ

Someone recently called LINQ “magic” in my hearing. I leapt to LINQ’s defense immediately. Turns out some people don’t realize “magic” is can be a pejorative term. I thought LINQ needed demystification. Here’s your best demystification resource: https://blogs.msdn.com/b/mattwar/archive/2008/11/18/linq-links.aspx. I won’t repeat much of what Matt Warren says in his excellent series, but will talk about some core ideas and how they affect the 2.1 release of StreamInsight.

Let’s tell the story of a LINQ query.

Compile time

It begins with some code:

IQueryable<Product> products = ...;
var query = from p in products
            where p.Name == "Widget"
            select p.ProductID;
foreach (int id in query)
{
    ...

When the code is compiled, the C# compiler (among other things) de-sugars the query expression (see C# spec section 7.16):

...
var query = products.Where(p => p.Name == "Widget").Select(p => p.ProductID);
...

Overload resolution subsequently binds the Queryable.Where<Product> and Queryable.Select<Product, int> extension methods (see C# spec sections 7.5 and 7.6.5). After overload resolution, the compiler knows something interesting about the anonymous functions (lambda syntax) in the de-sugared code: they must be converted to expression trees, i.e.,“an object structure that represents the structure of the anonymous function itself” (see C# spec section 6.5). The conversion is equivalent to the following rewrite:

...
var prm1 = Expression.Parameter(typeof(Product), "p");
var prm2 = Expression.Parameter(typeof(Product), "p");
var query = Queryable.Select<Product, int>(
    Queryable.Where<Product>(
        products,
        Expression.Lambda<Func<Product, bool>>(Expression.Property(prm1, "Name"), prm1)),
        Expression.Lambda<Func<Product, int>>(Expression.Property(prm2, "ProductID"), prm2));
...

If the “products” expression had type IEnumerable<Product> , the compiler would have chosen the Enumerable.Where and Enumerable.Select extension methodsinstead, in which case the anonymous functions would have been converted to delegates.

At this point, we’ve reduced the LINQ query to familiar code that will compile in C# 2.0. (Note that I’m using C# snippets to illustrate transformations that occur in the compiler, not to suggest a viable compiler design!)

Runtime

When the above program is executed, the Queryable.Where method is invoked. It takes two arguments. The first is an IQueryable<> instance that exposes an Expression property and a Provider property. The second is an expression tree. The Queryable.Where method implementation looks something like this:

public static IQueryable<T> Where<T>(this IQueryable<T> source, Expression<Func<T, bool>> predicate)
{
    return source.Provider.CreateQuery<T>(
    Expression.Call(this method, source.Expression, Expression.Quote(predicate)));
}

Notice that the method is really just composing a new expression tree that calls itself with arguments derived from the source and predicate arguments. Also notice that the query object returned from the method is associated with the same provider as the source query.

By invoking operator methods, we’re constructing an expression tree that describes a query. Interestingly, the compiler and operator methods are colluding to construct a query expression tree. The important takeaway is that expression trees are built in one of two ways: (1) by the compiler when it sees an anonymous function that needs to be converted to an expression tree, and; (2) by a query operator method that constructs a new queryable object with an expression tree rooted in a call to the operator method (self-referential).

Next we hit the foreach block. At this point, the power of LINQ queries becomes apparent. The provider is able to determine how the query expression tree is evaluated! The code that began our story was intentionally vague about the definition of the “products” collection. Maybe it is a queryable in-memory collection of products:

var products = new[]
    { new Product { Name = "Widget", ProductID = 1 } }.AsQueryable();

The in-memory LINQ provider works by rewriting Queryable method calls to Enumerable method calls in the query expression tree. It then compiles the expression tree and evaluates it. It should be mentioned that the provider does not blindly rewrite all Queryable calls. It only rewrites a call when its arguments have been rewritten in a way that introduces a type mismatch, e.g. the first argument to Queryable.Where<Product> being rewritten as an expression of type IEnumerable<Product> from IQueryable<Product> . The type mismatch is triggered initially by a “leaf” expression like the one associated with the AsQueryable query: when the provider recognizes one of its own leaf expressions, it replaces the expression with the original IEnumerable<> constant expression. I like to think of this rewrite process as “type irritation” because the rewritten leaf expression is like a foreign body that triggers an immune response (further rewrites) in the tree. The technique ensures that only those portions of the expression tree constructed by a particular provider are rewritten by that provider: no type irritation, no rewrite.

Let’s consider the behavior of an alternative LINQ provider. If “products” is a collection created by a LINQ to SQL provider:

var products = new NorthwindDataContext().Products;

the provider rewrites the expression tree as a SQL query that is then evaluated by your favorite RDBMS. The predicate may ultimately be evaluated using an index! In this example, the expression associated with the Products property is the “leaf” expression.

StreamInsight 2.1

For the in-memory LINQ to Objects provider, a leaf is an in-memory collection. For LINQ to SQL, a leaf is a table or view.

When defining a “process” in StreamInsight 2.1, what is a leaf? To StreamInsight a leaf is logic: an adapter, a sequence, or even a query targeting an entirely different LINQ provider!

How do we represent the logic? Remember that a standing query may outlive the client that provisioned it. A reference to a sequence object in the client application is therefore not terribly useful. But if we instead represent the code constructing the sequence as an expression, we can host the sequence in the server:

using (var server = Server.Connect(...))
{
    var app = server.Applications["my application"];
    var source = app.DefineObservable(() => Observable.Range(0, 10, Scheduler.NewThread));
    var query = from i in source where i % 2 == 0 select i;
}

Example 1: defining a source and composing a query

Let’s look in more detail at what’s happening in example 1. We first connect to the remote server and retrieve an existing app. Next, we define a simple Reactive sequence using the Observable.Range method. Notice that the call to the Range method is in the body of an anonymous function. This is important because it means the source sequence definition is in the form of an expression, rather than simply an opaque reference to an IObservable<int> object. The variation in Example 2 fails. Although it looks similar, the sequence is now a reference to an in-memory observable collection:

var local = Observable.Range(0, 10, Scheduler.NewThread);
var source = app.DefineObservable(() => local); // can’t serialize ‘local’!

Example 2: error referencing unserializable local object

The Define* methods support definitions of operator tree leaves that target the StreamInsight server. These methods all have the same basic structure. The definition argument is a lambda expression taking between 0 and 16 arguments and returning a source or sink. The method returns a proxy for the source or sink that can then be used for the usual style of LINQ query composition. The “define” methods exploit the compile-time C# feature that converts anonymous functions into translatable expression trees!

Query composition exploits the runtime pattern that allows expression trees to be constructed by operators taking queryable and expression (Expression<> ) arguments. The practical upshot: once you’ve Defined a source, you can compose LINQ queries in the familiar way using query expressions and operator combinators. Notably, queries can be composed using pull-sequences (LINQ to Objects IQueryable<> inputs), push sequences (Reactive IQbservable<> inputs), and temporal sequences (StreamInsight IQStreamable<> inputs). You can even construct processes that span these three domains using “bridge” method overloads (ToEnumerable, ToObservable and To*Streamable).

Finally, the targeted rewrite via type irritation pattern is used to ensure that StreamInsight computations can leverage other LINQ providers as well. Consider the following example (this example depends on Interactive Extensions):

var source = app.DefineEnumerable((int id) =>
    EnumerableEx.Using(() =>
        new NorthwindDataContext(), context =>
            from p in context.Products
            where p.ProductID == id
            select p.ProductName));

Within the definition, StreamInsight has no reason to suspect that it ‘owns’ the Queryable.Where and Queryable.Select calls, and it can therefore defer to LINQ to SQL! Let’s use this source in the context of a StreamInsight process:

var sink = app.DefineObserver(() => Observer.Create<string>(Console.WriteLine));

var query = from name in source(1).ToObservable()
            where name == "Widget"
            select name;

using (query.Bind(sink).Run("process"))
{
    ...
}

When we run the binding, the source portion which filters on product ID and projects the product name is evaluated by SQL Server. Outside of the definition, responsibility for evaluation shifts to the StreamInsight server where we create a bridge to the Reactive Framework (using ToObservable) and evaluate an additional predicate.

It’s incredibly easy to define computations that span multiple domains using these new features in StreamInsight 2.1!

Regards,
The StreamInsight Team