StreamInsight: Understanding dynamic query composition

Been tied up with PASS for the past (pun intended) couple of weeks, so it’s time to get a bit caught up on writing.  One of the key technical features of StreamInsight is the ability for one query to consume the output of another, enabling the system to avoid having to process events twice, and opening up a new world of flexibility.  The feature that unlocks these capabilities is Dynamic Query Composition.  That being said, some of the nuances of how StreamInsight constructs queries and flows events is not always obvious.  As this has resulted in me making a few sub-optimal design choices in the past Smile, I figured I’d put together a little article demonstrating the subtle nuances involved when using (or not using) DQC.

For more background on DQC, please refer to:

Take the diagram below, illustrating a common query composition pattern.  Given a single input stream (represented by the inputAdapter shape in the diagram), we would like to consume the stream of events from that adapter in both queryOne and queryTwo.

image

At first glance, this query syntax would seem to fit the bill (note that I’m using the new IObservable support for the input adapter, but using the classic adapter syntax to create the queries bound to an output adapter, to explicitly show query creation):

Code Snippet

  1. // Convert the data source into a temporal stream
  2. var orderStream = ordersSimple.ToPointStream(cepApp, s =>
  3.     PointEvent.CreateInsert(s.StartTime, s),
  4.     AdvanceTimeSettings.IncreasingStartTime);
  5.  
  6. // Create a query in two parts
  7. var queryOne = from e in orderStream where e.OrderID > 50 select e;
  8. var queryTwo = from e in queryOne.TumblingWindow(
  9.     TimeSpan.FromDays(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
  10. select new
  11. {
  12.     OrderCount = e.Count()
  13. };
  14.  
  15. // Convert these templates into queries, bound to an output adapter
  16. var tracerConfig = new TracerConfig()
  17. {
  18.         DisplayCtiEvents = false,
  19.         SingleLine = true,
  20.         TracerKind = TracerKind.Console
  21. };
  22.  
  23. var queryOneRun = queryOne.ToQuery(cepApp, "QueryOne", "",
  24.     typeof(TracerFactory), tracerConfig, EventShape.Point,
  25.     StreamEventOrder.FullyOrdered);
  26.  
  27. var queryTwoRun = queryTwo.ToQuery(cepApp, "QueryTwo", "",
  28.     typeof(TracerFactory), tracerConfig, EventShape.Interval,
  29.     StreamEventOrder.FullyOrdered);

However, this wouldn’t produce the desired result.  Instead, the StreamInsight engine will construct the query pattern seen in the diagram below – where queryTwo composes the design of queryOne, but not the runtime stream (i.e. creates two adapter instances).

image

This can be verified by using the debugger to check the input source of the query.  For details on how to use the debugger, see my blog post here.

  1. Open the Event Flow Debugger, and connect to your StreamInsight instance (walkthrough on using the Event Flow Debugger here).
  2. Navigate to your queryTwo definition, and Show Query.
  3. As seen in the diagram below, if the input adapter is an actual input adapter (and not a published stream) you have two independent streams each with their own input adapter.

 

image image

In order to have the two queries compose at runtime, we need to:

  1. Publish the results of the first query as a published stream with a strongly typed output (published schemas cannot have anonymous types)
  2. Consume the published stream as the input source of the second query.

Updating the query syntax to take advantage of DQC looks like:

Code Snippet

  1. // Create a simple data source (oData feed), using a non-anonymous
  2. // type
  3. var ordersSimple = from o in northwind.Orders
  4.                     where o.OrderDate != null && o.ShippedDate != null
  5.                     select new NorthwindOrderResult
  6.                     {
  7.                         StartTime = (DateTime)o.OrderDate,
  8.                         EndTime = (DateTime)o.ShippedDate,
  9.                         OrderID = o.OrderID,
  10.                         ShipRegion = o.ShipRegion,
  11.                         CompanyName = o.Customer.CompanyName
  12.                     };
  13.  
  14. // Convert the data source into a temporal stream
  15. var orderStream = ordersSimple.ToPointStream(cepApp, s =>
  16.     PointEvent.CreateInsert(s.StartTime, s),
  17.     AdvanceTimeSettings.IncreasingStartTime);
  18.  
  19. // Create a query
  20. var queryOne = from e in orderStream where e.OrderID > 50 select e;                     
  21. var queryOneRun = queryOne.ToQuery(cepApp, "QueryOne", "",
  22.     typeof(TracerFactory), tracerConfig, EventShape.Point,
  23.     StreamEventOrder.FullyOrdered);
  24.  
  25. // Convert the query's output into a published stream (with a
  26. // non-anonymous type)
  27. var queryOneStream = queryOneRun.ToStream<NorthwindOrderResult>();
  28.  
  29. // Bind the second query to the published stream
  30. var queryTwo = from e in queryOneStream.TumblingWindow(
  31.     TimeSpan.FromDays(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
  32. select new
  33. {
  34.     OrderCount = e.Count()
  35. };
  36.            
  37. // Run the second query
  38. var queryTwoRun = queryTwo.ToQuery(cepApp, "QueryTwo", "",
  39.     typeof(TracerFactory), tracerConfig, EventShape.Interval,
  40.     StreamEventOrder.FullyOrdered);

Now that we’ve updated the query structures to use DQC, let’s take another look at the query structure in the debugger.  Note that the input source for the second query is now a published stream (as it is now bound to a URI source, not a direct input adapter).

image image

There we go – we can now feed the run-time results of one query into another query using Dynamic Query Composition.