LINQ “Macros” in StreamInsight 1.2: Left Outer Join

In an earlier post, I discussed implementation of custom query operators that combine existing built-in operators. In StreamInsight 1.2, we have made some changes to simplify implementation of custom operators. In the running example from the previous post, I showed how you can manually construct an expression tree representing a Left Anti Semi Join (LASJ), which required a fair amount of code in StreamInsight 1.1:

 public static CepStream<TLeft> LeftAntiSemiJoin<TLeft, TRight>(
    this CepStream<TLeft> left, 
    CepStream<TRight> right,
    Expression<Func<TLeft, TRight, bool>> predicate)
{
    …
 
    // l => right.Where(r => predicate(l, r)).IsEmpty()
    var leftPredicate = Expression.Lambda<Func<TLeft, bool>>(
        Expression.Call(isEmptyMethod,
            Expression.Call(whereMethod,
                Expression.Constant(right),
                Expression.Lambda<Func<TRight, bool>>(predicate.Body, rightPrm))),
        leftPrm);
 
    return left.Where(leftPredicate);
}

Code fragment 1: LASJ in StreamInsight 1.1

In StreamInsight 1.2, we have added support for invocation expressions. Take a look at Joe and Ben Albahari’s article on Dynamically Composing Expression Predicates for an idea of how to use this feature. Code fragment 1 can be rewritten as:

 public static CepStream<TLeft> LeftAndSemiJoin<TLeft, TRight>(
    this CepStream<TLeft> left, 
    CepStream<TRight> right, 
    Expression<Func<TLeft, TRight, bool>> predicate)
{
    return from l in left
            where (from r in right
                   where predicate.Compile()(l, r)
                   select r).IsEmpty()
            select l;
}

Code fragment 2: LASJ in StreamInsight 1.2

The invocation expression has been highlighted. Notice that we Compile the predicate expression. What does this call represent in a StreamInsight query? StreamInsight recognizes the Compile method but instead of compiling to a CLR delegate – an opaque delegate that cannot be evaluated remotely – it “compiles” a StreamInsight query in which the predicate expression has been inlined.

Now let’s take our LASJ operator and use it to construct a Left Outer Join (LOJ) operator. For most LINQ providers, there are several ways of describing LOJ, usually involving the DefaultIfEmpty operator. StreamInsight does not have a DefaultIfEmpty operator. Asking if an infinite stream is empty is potentially dangerous* so we’ll try something different. LOJ can also be written as the union of an inner join and LASJ, as in the following example:

 public static CepStream<TResult> LeftOuterJoin<TLeft, TRight, TResult>(
    this CepStream<TLeft> left, 
    CepStream<TRight> right,
    Expression<Func<TLeft, TRight, bool>> predicate,
    Expression<Func<TLeft, TResult>> outerSelector,
    Expression<Func<TLeft, TRight, TResult>> innerSelector)
{
    // left elements with no matching right elements:
    var outer = from l in left.LeftAndSemiJoin(right, predicate)
                select outerSelector.Compile().Invoke(l);
 
    // left elements with matching right elements:
    var inner = from l in left
                from r in right
                where predicate.Compile().Invoke(l, r)
                select innerSelector.Compile().Invoke(l, r);
 
    return outer.Union(inner);        
}

Code fragment 3: LOJ in StreamInsight 1.2

Notice that you can use either the delegate “Invoke” method as in the LeftOuterJoin example:

predicate.Compile().Invoke(l, r)

or the more traditional invocation syntax used in the LeftAntiSemiJoinExample:

predicate.Compile()(l, r)

The “selector” expressions produce identically-typed results for the case where the left-hand side has a matching event on the right-hand side (innerSelector) and the case where it does not (outerSelector), as in the following example:

 using (var server = Server.Create("Default"))
{
    var app = server.CreateApplication("loj");
 
    // create input data
    var left = new[]
    {
        createInsert(0, 2, 1),
        createInsert(1, 1.5, 2),
        createInsert(3, 4, 3),
    }.ToStream(app, AdvanceTimeSettings.IncreasingStartTime);
 
    var right = new[]
    {
        createInsert(0.5, 1.5, 1),
        createInsert(1.25, 1.5, 2),
        createInsert(3.5, 4.5, 3),
    }.ToStream(app, AdvanceTimeSettings.IncreasingStartTime);
 
    // left outer join
    var query = left.LeftOuterJoin(
        right,
        (l, r) => l == r,
        l => new { Left = l, Right = (int?)null },
        (l, r) => new { Left = l, Right = (int?)r });
}

Code fragment 4: Using LOJ operator within a StreamInsight query.

I mentioned a limitation on macro use within lambda bodies in my previous post. This limitation no longer exists: when StreamInsight encounters an unrecognized method (e.g. LeftOuterJoin in the above example) returning a stream definition, it will invoke the method to see what it returns, on the grounds that doing so may be better than failing.

The examples in this post are included in our updated LINQPad samples. Give them a try!

* Yes, there’s something odd about StreamInsight’s IsEmpty operator. It is permitted strictly in the context of the LASJ pattern where it asks not whether a stream is empty in its entirety, but when the stream is empty. Appropriate for a query language with a temporal bias!