StreamInsight and reference data (lists, databases, etc)

Using reference data in StreamInsight is a very common scenarios; some examples would be:

  • Monitoring process control event streams for values that exceed a given threshold (for example, a valve’s pressure exceeding a certain safety threshold).
  • Enriching utility usage information in a smart metering scenario with user and geography information (for example, to allow visualization of power usage by geography).

This reference data is commonly held in some form of repository, such as SQL Server, a process historian’s metadata store (e.g. OSIsoft’s PI Asset Framework), which then needs to be integrated into a StreamInsight query.

In my previous post, I showed how to use the TimeImport capability on streams to synchronize a slow moving reference stream with a fast moving data stream.  I did not, however, walk through the process of creating a reference stream from a relatively static source (such as a database table or file).  I intend to now correct that minor, and somewhat deliberate oversight Smile

The project referenced in this blog post, along with the source data files is available here.

Reference data can be integrated into StreamInsight in a couple of different ways, each with their own pros and cons:

Technique Pros Cons
  • User-defined lookup function
  • Relatively simple to implement
  • Not suitable for large dynamic data sets or data that changes over time
  • Cumbersome to import more than one data element
  • Cannot take advantage of or incorporate stream data
  • Reference data stream
  • Seamless integration with StreamInsight query experience
  • Very performant and flexible
  • Additional work required to import and maintain the stream

Let’s look at an example of each technique, step by step. 

The Scenario

For this illustration, let’s consider the following scenario:

  • Stream of data events, each containing a user ID, an activity code and a status (user 7, logged in, success).
  • Set of user metadata, contained in a file (for the purposes of illustration – a database would be a more likely source).  This metadata contains information such as user ID, user name, etc.
  • Want to annotate the data events with the user metadata.

We’ll use the following data sets:

Data Events

Start Time End Time User Id Activity Code Status

6/25/2009 0:00:00

  1 Logon Success

6/25/2009 0:00:01

  2 Logon Fail

6/25/2009 0:00:05

  1 Browse Success

6/25/2009 0:00:06

  2 Logon Success

Or, in the .csv format that we’ll use:

 StartTime,EndTime,UserId,ActivityCode,Status6/25/2009 0:00:00,,1,Logon,Success6/25/2009 0:00:01,,2,Logon,Fail6/25/2009 0:00:05,,1,Browse,Success6/25/2009 0:00:06,,2,Logon,Success

Reference Events

Start Time End Time User Id User Name Location

6/25/2009 0:00:00

  1 Fred Jones Seattle

6/25/2009 0:00:01

  2 Bob Murphy Portland

Or, in the .csv format that we’ll use:

 StartTime,EndTime,UserId,UserName,Location6/24/2009 0:00:00,,1,Fred Jones,Seattle6/24/2009 0:00:01,,2,Bob Murphy,Portland

User-Defined Lookup Function

With a user defined lookup function, we need to load the list of metadata into an associative array (i.e. a hashtable), and create a lookup function that allows us to grab user metadata from the dictionary on demand.

Note: Why did I use the hard coded dictionary rather than reading from a file?  I wanted to illustrate the technique of a truly static data source.  If you want to see the code that parses a .csv file into a dictionary object suitable for using in a lookup function, this is covered in the appendix section of the blog).

Note: Why didn’t I simply join the dictionary with the data stream?  StreamInsight needs to join streams both relationally and temporally.  Since a dictionary isn’t a stream (and has no concept of time, it’s not possible to directly join.  Converting a reference source into a stream and joining is the other technique covered in the next section.

Code Snippet

  1. static Query CreateUserDefinedFunction(Application cepApp)
  2. {
  3.     var dataStream = CepStream<DataEvent>.Create("dataStream",
  4.         typeof(TextFileReaderFactory), new TextFileReaderConfig()
  5.         {
  6.                 InputFileName = "dataEvents.csv",
  7.                 CtiFrequency = 1,
  8.                 CultureName = CultureInfo.InvariantCulture.Name,
  9.                 Delimiter = ','                       
  10.         },  EventShape.Point);
  11.  
  12.     // Populate the reference data statically
  13.     refData.Add(1, new ReferenceData()
  14.         { UserId = 1, UserName = "Fred Jones", Location = "Seattle" } );
  15.     refData.Add(2, new ReferenceData()
  16.         { UserId = 2, UserName = "Bob Murphy", Location = "Portland" } );
  17.             
  18.     // Create a stream with the user name added to the query
  19.     var joinedStream = from e in dataStream
  20.         select new
  21.         {
  22.             UserId = e.UserId,
  23.             Activity = e.ActivityCode,
  24.             Status = e.Status,
  25.             // Use a static lookup function to retrieve the name
  26.             Name = LookupName(e.UserId),
  27.             // Use a static lookup function to retrieve the location
  28.             Location = LookupLocation(e.UserId)
  29.         };
  30.  
  31.     // Attach a trace output adapter to the stream
  32.     var query = joinedStream.ToQuery(cepApp, "outputLookup", "",
  33.         typeof(TracerFactory), new TracerConfig()
  34.         {
  35.             DisplayCtiEvents = false,
  36.             SingleLine = false,
  37.             TraceName = "REF",
  38.             TracerKind = TracerKind.Console
  39.         }, EventShape.Point, StreamEventOrder.FullyOrdered);
  40.  
  41.     return query;
  42. }
  43.  
  44. public static string LookupName(int userId)
  45. {
  46.     if (refData.ContainsKey(userId))
  47.         return refData[userId].UserName;
  48.     else
  49.         return "Unknown";
  50. }
  51.  
  52. public static string LookupLocation(int userId)
  53. {
  54.     if (refData.ContainsKey(userId))
  55.         return refData[userId].Location;
  56.     else
  57.         return "Unknown";
  58. }

Upon running this function with the supplied data files, we observe the merged output:

 Press <enter> to close applicationREF,Point,12:00:00.000        Activity = Logon        Location = Seattle        Name = Fred Jones        Status = Success        UserId = 1REF,Point,12:00:01.000        Activity = Logon        Location = Portland        Name = Bob Murphy        Status = Fail        UserId = 2REF,Point,12:00:05.000        Activity = Browse        Location = Seattle        Name = Fred Jones        Status = Success        UserId = 1REF,Point,12:00:06.000        Activity = Logon        Location = Portland        Name = Bob Murphy        Status = Success        UserId = 2

Reference Data Stream (static data)

Using the more robust reference data stream approach, we:

  • Use a TextFileInputFactory to import the contents of the text file
  • Convert the series of point events into “infinite” signals.
  • Join the reference stream with the data stream.

Note: this addresses conditions where the reference data DOES NOT CHANGE.  This is not a very real-world scenario – we’ll cover the additions required to incorporate changing data in the next blog post, showing how to pull data from SQL Server. 

Note: I don’t use the CTI import technique in this example.  This is due to the text file input adapter sending an “infinite time” CTI when it reaches the end of the file.  If we were using a continuous or changing reference data source (such as SQL Server), I’d need to employ that technique.

Code Snippet

  1. static Query CreateThresholdStream(Application cepApp)
  2. {
  3.     // Pull the data events from the dataEvents.csv file
  4.     var dataStream = CepStream<DataEvent>.Create("dataStream",
  5.         typeof(TextFileReaderFactory), new TextFileReaderConfig()
  6.         {
  7.                 InputFileName = "dataEvents.csv",
  8.             CtiFrequency = 1,
  9.             CultureName = CultureInfo.InvariantCulture.Name,
  10.             Delimiter = ','                     
  11.         }, EventShape.Point);
  12.  
  13.     // Create a reference stream using the datastream as a time reference
  14.     var refStream = CepStream<ReferenceData>.Create("refStream",
  15.         typeof(TextFileReaderFactory), new TextFileReaderConfig()
  16.         {
  17.             InputFileName = "refEvents.csv",
  18.             CtiFrequency = 1,
  19.             CultureName = CultureInfo.InvariantCulture.Name,
  20.             Delimiter = ','
  21.         }, EventShape.Point);
  22.  
  23.     // Stretch the ref stream points events out to infinity (i.e. must have
  24.     // relational and temporal matches to join, and for this example, the data
  25.     // doesn't change.
  26.     var referenceEvents = from e in refStream.AlterEventDuration(
  27.                                 e => TimeSpan.MaxValue)
  28.                             select e;
  29.  
  30.     // Join the two streams
  31.     var joinedStream = from e1 in dataStream
  32.                         join e2 in referenceEvents
  33.         on e1.UserId equals e2.UserId
  34.         select new
  35.         {
  36.             UserId = e1.UserId,
  37.             Activity = e1.ActivityCode,
  38.             Status = e1.Status,
  39.             // Use the reference stream to retrieve the name
  40.             Name = e2.UserName,
  41.             // Use the reference stream to retrieve the location
  42.             Location = e2.Location
  43.         };
  44.  
  45.     var query = joinedStream.ToQuery(cepApp, "outputStream", "",
  46.         typeof(TracerFactory), new TracerConfig()
  47.         {
  48.             DisplayCtiEvents = false,
  49.             SingleLine = false,
  50.             TraceName = "STREAM",
  51.             TracerKind = TracerKind.Console
  52.         }, EventShape.Point, StreamEventOrder.FullyOrdered);
  53.  
  54.     return query;
  55. }

Upon running this query with the supplied data sets, we observe:

 STREAM,Point,12:00:00.000        Activity = Logon        Location = Seattle        Name = Fred Jones        Status = Success        UserId = 1STREAM,Point,12:00:01.000        Activity = Logon        Location = Portland        Name = Bob Murphy        Status = Fail        UserId = 2STREAM,Point,12:00:05.000        Activity = Browse        Location = Seattle        Name = Fred Jones        Status = Success        UserId = 1STREAM,Point,12:00:06.000        Activity = Logon        Location = Portland        Name = Bob Murphy        Status = Success        UserId = 2

Compare and Contrast

For sake of clarity, let’s put the two techniques side by side.

Lookup Reference Stream
Code Snippet
  1. // Create a stream with the user name added to the query
  2. var joinedStream = from e in dataStream
  3. select new
  4. {
  5.     UserId = e.UserId,
  6.     Activity = e.ActivityCode,
  7.     Status = e.Status,
  8.     // Use a static lookup function to retrieve the name
  9.     Name = LookupName(e.UserId),
  10.     // Use a static lookup function to retrieve the location
  11.     Location = LookupLocation(e.UserId)
  12. };
Code Snippet
  1. // Stretch the ref stream points events out to infinity (i.e. must have
  2. // relational and temporal matches to join, and for this example, the data
  3. // doesn't change.
  4. var referenceEvents = from e in refStream.AlterEventDuration(
  5.                         e => TimeSpan.MaxValue)
  6.                     select e;
  7.  
  8. // Join the two streams
  9. var joinedStream = from e1 in dataStream
  10.                 join e2 in referenceEvents
  11. on e1.UserId equals e2.UserId
  12. select new
  13. {
  14.     UserId = e1.UserId,
  15.     Activity = e1.ActivityCode,
  16.     Status = e1.Status,
  17.     // Use the reference stream to retrieve the name
  18.     Name = e2.UserName,
  19.     // Use the reference stream to retrieve the location
  20.     Location = e2.Location
  21. };