StreamInsight: Synchronizing slow-moving reference streams with fast-moving data streams (time import)

One of the common tasks in StreamInsight is to use a reference stream to integrate metadata or reference data from a relatively static source (such as a SQL Server table – a walkthrough of this technique will be described in an upcoming blog post).  One of the challenges in integrating the reference stream has to do with liveliness; that is to say:

image

In this diagram we see two streams (data and reference) joined together to create a joined stream, with a LINQ syntax such as:

 CepStream<SensorReading> dataStream = ...;
CepStream<SensorMetadata> metadataStream = ...;

var joinedQuery = from e1 in dataStream
                    join e2 in metadataStream
                    on e1.SensorId equals e2.SensorId
                    select e1;

From the diagram above, we see that we’ll only get two output events for (1) and (2).  This is as a result of how the StreamInsight engine produces output – based on the application time (which is updated by the CTIs – current time indicators, or how the StreamInsight engine knows it has all of the data required to calculate results).  In this case the engine sees:

  • I have a time of T5 on the data stream
  • I have a time of T0 (ish) on the reference stream
  • Since I have joined these two streams together I can only produce output when I have received all of the events for a given time period.  Since the reference stream lags behind the data stream, I can only output events at the speed of the reference stream.

In general, this is absolutely the correct behavior.  However, we know something about the reference stream that the StreamInsight engine does not – that the reference stream is slowly changing, and we don’t want to wait for it to catch up.  That is to say, we want to produce output at the pace of the data stream.  How do we do this?

By importing the CTIs from the data stream into the reference stream:

image

This operation is performed by one of the overloads on the CepStream<>.Create() method, as per the syntax below (reading from this csv file as the sample data source, and this csv file as the sample reference source).  The entire project can be downloaded from here (look in the SimpleJoin.cs class).

 ////////////////////////////////////////////////////////////////////
// Create a time import settings definition that states that a stream
// will import its CTI settings from dataStream 
var timeImportSettings = new AdvanceTimeSettings(null,
    new AdvanceTimeImportSettings("dataStream"),
    AdvanceTimePolicy.Adjust);

////////////////////////////////////////////////////////////////////
// Create a reference data stream from the refStream.csv file; use
// the CTIs from the dataStream as defined by the timeImportSettings
// object
CepStream<SensorMetadata> metadataStream = CepStream<SensorMetadata>
    .Create("refStream", typeof(TextFileReaderFactory),
        new TextFileReaderConfig()
        {
                CtiFrequency = 1,
                CultureName = CultureInfo.CurrentCulture.Name,
                Delimiter = ',',
                InputFileName = "refStream.csv"
        }, EventShape.Point, timeImportSettings);

Note that this syntax assumes the presence of a “dataStream” stream.  At this point, we should be able to join the two streams and see a steady flow of output.  However, if we simply want to look at the raw output of metadata stream as per:

 var rawData = metadataStream.ToQuery(cepApp, "MetadataStream", "",
    typeof(TracerFactory), traceConfig, EventShape.Interval,
    StreamEventOrder.FullyOrdered);

Here is the error that is returned:

 Error in query: Microsoft.ComplexEventProcessing.ManagementException: Advance time import stream 'dataStream' does not exist. --->         Microsoft.ComplexEventProcessing.Compiler.CompilerException: Advance time import stream 'dataStream' does not exist.

Huh?  What do you mean the time import stream does not exist?  I already defined it!  What’s actually happening here is that the import stream has not yet been physically joined with the stream.  In order to resolve this issue, you need to join the two streams before binding an output adapter (i.e. creating a query) .

 ////////////////////////////////////////////////////////////////////
// Create a join of the two streams, and bind the output to the 
// console
var joinedQuery = from e1 in dataStream
                    join e2 in metadataStream
                    on e1.SensorId equals e2.SensorId
                    select e1;

var query = joinedQuery.ToQuery(cepApp, "JoinedOutput", "",
    typeof(TracerFactory), traceConfig, EventShape.Interval,
    StreamEventOrder.FullyOrdered);

Ok, now let’s look at the output:

 REF,Interval from 06/25/2009 00:00:00 +00:00 to 06/25/2009 00:00:00 +00:00:,MySensor_1001,1001,14REF: CTI at 06/25/2009 00:00:00 +00:00REF: CTI at 06/25/2009 00:00:09 +00:00REF: CTI at 12/31/9999 23:59:59 +00:00

Huh?  Where’s all of my output?  Look at the data sources in question – the reference data is a sequence of point events.  If we want to truly use it as a reference stream, we need to convert the series of point events into edge events (i.e. this value is good until replaced by a new value).  In order to do this, we need to apply an AlterEventDuration and a Clip operator:

 // Convert the point events from the reference stream into edge events
var edgeEvents = from e in metadataStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(metadataStream, (e1, e2) => (e1.SensorId == e2.SensorId))
                    select e;

What this bit of code does is:

  • Stretch the point events out to infinity (i.e. these reference values are good “forever”)
  • Clip each point event by any arriving event with the same sensor ID.  For example, given a value of (1001, SensorId_1001), if another event later arrives with the values (1001, MySensor), the initial event will be clipped off and the new value will be MySensor

image

Then, putting the whole thing together:

 // Convert the point events from the reference stream into edge events
var edgeEvents = from e in metadataStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(metadataStream, (e1, e2) => (e1.SensorId == e2.SensorId))
                    select e;

    ////////////////////////////////////////////////////////////////////
    // Create a join of the two streams, and bind the output to the 
    // console
var joinedQuery = from e1 in dataStream
                    join e2 in edgeEvents
                    on e1.SensorId equals e2.SensorId
                    select new
                    {
                        SensorId = e1.SensorId,
                        Name = e2.Name,
                        Value = e1.Value
                    };

Results in:

 

 REF,Interval,12:00:00.000,12:00:00.000,,MySensor_1001,1001,14REF: CTI at 06/25/2009 00:00:00 +00:00REF,Interval,12:00:01.000,12:00:01.000,,MySensor_1001,1001,4REF,Interval,12:00:02.000,12:00:02.000,,MySensor_1001,1001,77REF,Interval,12:00:03.000,12:00:03.000,,MySensor_1001,1001,44REF,Interval,12:00:04.000,12:00:04.000,,MySensor_1001,1001,22REF,Interval,12:00:05.000,12:00:05.000,,MySensor_1001,1001,51REF,Interval,12:00:06.000,12:00:06.000,,MySensor_1001,1001,46REF,Interval,12:00:07.000,12:00:07.000,,MySensor_1001,1001,71REF,Interval,12:00:08.000,12:00:08.000,,MySensor_1001,1001,37REF,Interval,12:00:09.000,12:00:09.000,,MySensor_1001,1001,45REF: CTI at 12/31/9999 23:59:59 +00:00