Scripting StreamInsight queries

Over the past couple of weeks, a handful of people have asked for help dynamically creating StreamInsight queries. I usually scrawl some boxes and arrows on the whiteboard and say “you could try something like this…” My hand-waving hasn’t been very helpful. I’ll write some code instead…

A StreamInsight query includes a “template” definition that essentially describes the operator tree. Normally, a developer describes a template using a LINQ query (StreamInsight understands a LINQ dialect for streaming/temporal queries). The LINQ query is then translated into an XML document (schema details are available here) which is compiled and executed by the StreamInsight engine. In one sense, you can see that StreamInsight already supports dynamic queries since an application can construct a query operator tree using the XML specification language directly. A visual design surface or DSL could potentially be used to generate the XML. My preferred general-purpose stream query language is LINQ however, so I’ll instead consider what it takes to dynamically generate a LINQ query using StreamInsight.

Anders Hejlsberg’s talk on The Future of C# gives a possible answer. Using the proposed C# eval facility, I could write something like:

// initialize an evaluator

var ev = new CSharpEvaluator

{

    References = { typeof(CepStream).Assembly, typeof(Expression).Assembly, typeof(EventType1).Assembly, },

    Usings = { "MyNamespace", "Microsoft.ComplexEventProcessing.Linq", },

};

// add input streams

ev.Evaluate(@"var input1 = CepStream<EventType1>.Create(""input1"")");

// script query

var queryDefinition = (CepStream<EventType1>)ev.Evaluate("from i in input1 where i.X > 10 select i");

 

Since I don’t want to wait for C# 5.0, I’ll use a more specialized “scripter” tool designed specifically for StreamInsight query templates:

// initialize a scripter

var scripter = new QueryTemplateScripter();

// add input streams

scripter.AddStream("input1", typeof(EventType1));

// script query

var template = scripter.CreateQueryTemplate(app, "MyTemplate", null,

    "from i in input1 where i.X > 10 select i");

 

The scripter allows you to:

  • reference assemblies (the required StreamInsight assemblies are referenced by default);
  • register input streams for your query using the AddStream method;
  • add “using” namespaces (the expected namespaces are included by default), and finally;
  • create a query template based on a LINQ query specification.

The scripter essentially inlines the LINQ query into a program containing any event definitions (in the above example, we are merely referencing an existing type so no definition is required) and a query “context” that passes the LINQ query definition to an existing Application.CreateQueryTemplate method. An example of a generated program is shown below:

//------------------------------------------------------------------------------

// <auto-generated>

// This code was generated by a tool.

// Runtime Version:2.0.50727.4952

//

// Changes to this file may cause incorrect behavior and will be lost if

// the code is regenerated.

// </auto-generated>

//------------------------------------------------------------------------------

namespace @__QueryTemplateScripter

{

    using System;

    using Microsoft.ComplexEventProcessing;

    using Microsoft.ComplexEventProcessing.Linq;

    public class @__QueryTemplateScripterContext

    {

        public static Microsoft.ComplexEventProcessing.QueryTemplate @__CreateQueryTemplate(Microsoft.ComplexEventProcessing.Application application, string name, string description)

        {

            CepStream<ConsoleApplication1.EventType1> input1 = CepStream<ConsoleApplication1.EventType1>.Create("input1");

            var definition =

#line 1 "__LinqQueryDefinition"

 from i in input1 where i.X > 10 select i

#line default

#line hidden

;

            return application.CreateQueryTemplate(name, description, definition);

        }

    }

}

 

Such programs are generated, compiled and the query template creation method is dynamically invoked to register a new query template in StreamInsight. This approach has some limitations:

  • Compiler error reporting may or may not be useful. I use #line pragmas in the generated code to pinpoint the location of any local errors in the user’s query, but the LINQ query definition may not be a well-formed expression. Consider what would happen if you were to call scripter.CreateQueryTemplate(app, “q”, null, “input1; return null }”);
  • Related to the above point, there is an injection risk. The usual common sense security rules apply: do not evaluate code from untrusted sources.
  • Every call to CreateQueryTemplate results in the creation of a new assembly that remains loaded for the lifetime of the app domain.

Full source code for the scripter is attached. The event definition code is intentionally extensible to support specification of stream event types using something other than System.Type. Possible alternatives: CepEventType or some other representation of the event record layout.

ScriptingStreamInsight.zip