Time in StreamInsight (III)

Part 3: Synchronizing Streams in Time

This post deals with the problem of timeliness of results for a query with multiple input steams. So far we have seen what considerations need to be made to advance time for a single input stream. The semantics of the binary operators Join and Union are such that they release events when both branches have seen CTI events. Put it more simply, it means that a query with multiple inputs will have output only when all inputs advance time.

Let us consider the implications of the above in two cases. If a query joins a stream with very frequent updates (for instance a sensor measurement) with a stream with almost no updates (e.g., reference data containing the sensor’s geographical location) we may be in a situation when the query has no output, even though it sees lots of events in one of the input streams. However, the other stream might not update for a long time. Often CTIs are generated relative to the events in the stream, and when there are no events in the stream there will not be any CTI generated. Hence, the join will issue output only according to the slower of its input.

Another situation is the union of two streams with variable input rates, i.e., one input stream with lots of events, the other with only a little. We assume in this case that the CTIs are generated as well. The frequency of output will be dictated by the slower stream.

In the two scenarios above we assume of course that the events do not have pathological timestamps, e.g., that they do not overlap and that the adapters do not emit CTIs on their own that advance time very far into the future. We do see a pattern where the slowest stream in terms of the frequency of the CTIs is dictating the frequency of output, and this is something that users might wish to avoid.

StreamInsight offers the possibility for one stream to import its CTIs from another stream. See https://msdn.microsoft.com/en-us/library/ff518502.aspx for a description of the syntax. The picture below shows how this feature should be understood. Let’s say that we have a portion of the query as follows: two input streams, Input1 and Input2, are used as input for a binary operator (e.g., Join or Union). If Input2 declares that it imports the CTIs from Input1, then StreamInsight will ensure that all the CTIs generated or enqueued by Input1 will appear to also be enqueued by Input2.

It is possible to have circular loops of any size, e.g., to also have Input1 import the CTIs of Input2, should there be any. This would help in situations like the second use case where it is not known in advance which input stream has the fastest data rate.

There are some things to keep in mind when using this feature. Seen from the perspective of the slower input stream, there are now “arbitrary” CTIs coming from an outside source. Since we cannot lose CTI consistency some events may be dropped from the slow stream if they violate the CTI. This will happen if the two streams are not in order w.r.t. each other. When enqueueing edge events, instead of being dropped, they can be adjusted to start at the latest CTI if the user chooses AdvanceTimePolicy.Adjust.

Going back to the first user scenario of joining a sensor stream with a reference data stream, it seems natural to model the second stream as a set of edge events, and the computation is still correct even if the start edge of the event is adjusted. For the second scenario where two streams are union’ed, one needs to be careful using this feature, because outlier events in one of the stream can cause the time to advance too far and this will lead to events from the second stream being dropped. So users of this feature should look at the data characteristics of the stream before deciding whether to import CTIs.

Regards,
Ciprian Gerea