StreamInsight: Creating a LINQPad data context

This blog post is for those folks comfortable with creating a data stream from an IEnumerable, IObservable or StreamInsight adapter data source.  If you’d like more details on creating those, have a look at this blog post, which covers creating an IEnumerable ‘canned’ data source then wrapping a LINQPad context around it.

Creating a data context is very straightforward:

  • Create a new C# class library project in Visual Studio 2010 called SampleContext.
    • Ensure that the Framework type is set to .NET Framework 4 (and NOT .NET Framework 4 Client Profile)
    • Add a reference to Microsoft.ComplexEventProcessing.dll
    • Add a reference to Microsoft.ComplexEventProcessing.Observable.dll
      • This is required for the IObservable<T>.ToPointStream extension method.
    • Add a reference to Microsoft.ComplexEventProcessing.Adapters.dll
      • Optional – this is only required if you have an adapter embedded inside of your context
    • Add a reference to System.Reactive.dll
      • Optional – this is only required for creating different types of IObservable streams.
      • Note: System.Reactive.dll is part of the Reactive Extensions framework, which needs to be downloaded separately.
  • Derive a class from StreamInsightContext in the StreamInsightLinqPad.Samples assembly (available as part of the LINQPad driver for StreamInsight download).
  • Expose your stream as a property of the class.

 

3d Glossy Blue Orbs Business Stacked Document Icon Download the sample project here.

That’s it!  Here’s what this looks like in practice.  First we establish the basic class

  1. using System;
  2.     using System.Collections.Generic;
  3.     using System.Linq;
  4.     using System.Text;
  5.     using StreamInsightLinqPad.Samples;
  6.     using Microsoft.ComplexEventProcessing;
  7.     using Microsoft.ComplexEventProcessing.Linq;
  8.     using StreamInsight.Samples.Adapters.DataGenerator;
  9.  
  10.     /// <summary>
  11.     /// Define a sample LINQPad context for StreamInsight
  12.     /// </summary>
  13.     public class SampleContext : StreamInsightContext
  14.     {
  15.         public SampleContext(Server server)
  16.             : base(server)
  17.         { }
  18.     }

Now we’ll go ahead and expose three types of streams.  The first will be one generated from an IEnumerable.

  1. SimpleEvent[] events = new SimpleEvent[]
  2. {
  3.     new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 12, 0, 0), ID = 5, Message = "Test" },
  4.     new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 13, 0, 0), ID = 6, Message = "Test2" },
  5.     new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 14, 0, 0), ID = 7, Message = "Test3" },
  6.     new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 15, 0, 0), ID = 5, Message = "Test4" },
  7. };
  8.     
  9. /// <summary>
  10. /// Expose a stream created from an IEnumerable
  11. /// </summary>
  12. public CepStream<SimpleEvent> SimpleStream
  13. {
  14.     get
  15.     {
  16.         return events.ToPointStream(this.Application, t =>
  17.             PointEvent.CreateInsert<SimpleEvent>(t.Timestamp, t),
  18.             AdvanceTimeSettings.IncreasingStartTime);
  19.     }
  20. }

Next, we’ll use IObservable (created via Observer.Interval) to generate a “live” stream of events:

  1. /// <summary>
  2. /// Expose a stream created from an IObservable
  3. /// </summary>
  4. public CepStream<SimpleEvent> ObservableStream
  5. {
  6.     get
  7.     {
  8.         var rand = new Random();
  9.  
  10.         // Create a simple observable that returns a random event every
  11.         // 250 ms
  12.         var interval = Observable.Interval(TimeSpan.FromMilliseconds(250))
  13.             .Select(i => new SimpleEvent
  14.             {
  15.                 ID = rand.Next(10),
  16.                 Timestamp = DateTime.UtcNow,
  17.                 Message = "Observable message!"
  18.             });
  19.         return interval.ToPointStream(Application, s =>
  20.             PointEvent.CreateInsert(s.Timestamp, s),
  21.             AdvanceTimeSettings.IncreasingStartTime,
  22.             null);
  23.     }
  24. }

Finally, we’ll leverage the DataGenerator sample adapter (from https://streaminsight.codeplex.com/) to demonstrate creating a LINQPad stream from an adapter.

  1. /// <summary>
  2. /// Expose a stream created from an adapter instance
  3. /// </summary>
  4. public CepStream<GeneratedEvent> AdapterStream
  5. {
  6.     get
  7.     {
  8.         var generatorConfig = new GeneratorConfig()
  9.         {
  10.             CtiFrequency = 1,
  11.             DeviceCount = 3,
  12.             EventInterval = 250,
  13.             EventIntervalVariance = 10,
  14.             MaxValue = 100
  15.         };
  16.  
  17.         var inputStream = CepStream<GeneratedEvent>.Create(Application,
  18.             "inputStream", typeof(GeneratorFactory), generatorConfig,
  19.             EventShape.Point);
  20.         return inputStream;
  21.     }
  22. }

To test, we’ll need to configure LINQPad with our assembly as a data context.  To do this:

  1. Add Connection, then select Microsoft StreamInsight from the list of drivers.
  2. Select Custom Context from the Context Kind dropdown.
  3. Click on the ellipsis for Assembly Path (...), then browse to the directory containing SampleContext.dll.
  4. Click OK to finish selecting the new context.

image

The context streams should now be visible in LINQPad, similar to the screenshot below.  Create a new Query, and select C# Statements as the Language, and StreamInsight: SampleContext as the database.

image

Let’s dump our streams, using this LINQ code:

  1. // Dump out the enumerable stream.  Since this is an IEnumerable, it
  2. // will eventually complete and return
  3. SimpleStream.ToEnumerable().Dump("Enumerable stream");
  4.  
  5. // Dump out the observable stream.  As this is an IObservable without
  6. // an end condition, we will have to stop the query (as it will not
  7. // stop on its own)
  8. ObservableStream.ToObservable().Dump("Observable stream");

This will show us our enumerable and observable streams:

image

Finally, let’s dump out some data from our adapter stream:

  1. // Dump out the adapter stream.  As this is an adapter without
  2. // an end condition, we will have to stop the query (as it will not
  3. // stop on its own)
  4. AdapterStream.ToObservable().Dump("Adapter stream");

Resulting in:

image

Firey Orange Jelly Signs Bomb (Bombs) Icon Style 2 Note – I had to make one change to the adapter code to make it work with ToObservable.  The GeneratedEvent definition doesn’t define a default constructor (which is needed by ToObservable).   In GeneratedEvent.cs, on line 23 I added the code:
  1. public GeneratedEvent()
  2. { }

That’s it – a LINQPad data context in a nutshell.