Using Subjects to Deploy Queries Dynamically

In the previous blog posting, we showed how to construct and deploy query fragments to a StreamInsight server, and how to re-use them later. In today’s posting we’ll integrate this pattern into a method of dynamically composing a new query with an existing one.

The construct that enables this scenario in StreamInsight V2.1 is a Subject. A Subject lets me create a junction element in an existing query that I can tap into while the query is running.

To set this up as an end-to-end example, let’s first define a stream simulator as our data source:

var generator = myApp.DefineObservable(
    (TimeSpan t) => Observable.Interval(t).Select(_ => new SourcePayload()));

This ‘generator’ produces a new instance of SourcePayload with a period of t (system time) as an IObservable. SourcePayload happens to have a property of type double as its payload data.

Let’s also define a sink for our example—an IObserver of double values that writes to the console:

var console = myApp.DefineObserver(
    (string label) => Observer.Create<double>(e => Console.WriteLine("{0}: {1}", label, e)))
    .Deploy("ConsoleSink");

The observer takes a string as parameter which is used as a label on the console, so that we can distinguish the output of different sink instances. Note that we also deploy this observer, so that we can retrieve it later from the server from a different process.

Remember how we defined the aggregation as an IQStreamable function in the previous article? We will use that as well:

var avg = myApp
    .DefineStreamable((IQStreamable<SourcePayload> s, TimeSpan w) =>
        from win in s.TumblingWindow(w)
        select win.Avg(e => e.Value))
    .Deploy("AverageQuery");

Then we define the Subject, which acts as an observable sequence as well as an observer. Thus, we can feed a single source into the Subject and have multiple consumers—that can come and go at runtime—on the other side:

var subject = myApp.CreateSubject("Subject", () => new Subject<SourcePayload>());

Subject are always deployed automatically. Their name is used to retrieve them from a (potentially) different process (see below).

Note that the Subject as we defined it here doesn’t know anything about temporal streams. It is merely a sequence of SourcePayloads, without any notion of StreamInsight point events or CTIs. So in order to compose a temporal query on top of the Subject, we need to 'promote' the sequence of SourcePayloads into an IQStreamable of point events, including CTIs:

var stream = subject.ToPointStreamable(
    e => PointEvent.CreateInsert<SourcePayload>(e.Timestamp, e),
    AdvanceTimeSettings.StrictlyIncreasingStartTime);

In a later posting we will show how to use Subjects that have more awareness of time and can be used as a junction between QStreamables instead of IQbservables.

Having turned the Subject into a temporal stream, we can now define the aggregate on this stream. We will use the IQStreamable entity avg that we defined above:

var longAverages = avg(stream, TimeSpan.FromSeconds(5));

In order to run the query, we need to bind it to a sink, and bind the subject to the source:

var standardQuery = longAverages
    .Bind(console("5sec average"))
    .With(generator(TimeSpan.FromMilliseconds(300)).Bind(subject));

Lastly, we start the process:

standardQuery.Run("StandardProcess");

Now we have a simple query running end-to-end, producing results. What follows next is the crucial part of tapping into the Subject and adding another query that runs in parallel, using the same query definition (the “AverageQuery”) but with a different window length. We are assuming that we connected to the same StreamInsight server from a different process or even client, and thus have to retrieve the previously deployed entities through their names:

// simulate the addition of a 'fast' query from a separate server connection,
// by retrieving the aggregation query fragment
// (instead of simply using the 'avg' object)
var averageQuery = myApp
    .GetStreamable<IQStreamable<SourcePayload>, TimeSpan, double>("AverageQuery");

// retrieve the input sequence as a subject
var inputSequence = myApp
    .GetSubject<SourcePayload, SourcePayload>("Subject");

// retrieve the registered sink
var sink = myApp.GetObserver<string, double>("ConsoleSink");

// turn the sequence into a temporal stream
var stream2 = inputSequence.ToPointStreamable(
    e => PointEvent.CreateInsert<SourcePayload>(e.Timestamp, e),
    AdvanceTimeSettings.StrictlyIncreasingStartTime);

// apply the query, now with a different window length
var shortAverages = averageQuery(stream2, TimeSpan.FromSeconds(1));

// bind new sink to query and run it
var fastQuery = shortAverages
    .Bind(sink("1sec average"))
    .Run("FastProcess");

The attached solution demonstrates the sample end-to-end.

Regards,
The StreamInsight Team

QueryComposition.zip