In this blog, I would like to explain the design choices we made with Azure Stream Analytics for:
- ‘When’ in processing-time results are materialized, and
- ‘How’ the results are materialized.
These design choices reduce implementation complexities and run time resource usage while meeting customers’ requirements for their scenarios.
Firstly, I would like to point out that these are two very different issues. The ‘when’ part is a user visible behavior, and preferably controllable by the user. The ‘how’ part is purely an implementation detail. However, depending on the user visible behavior some implementations are more efficient than the others.
In Google Dataflow’s design, the concept of Trigger is introduced to control ‘when’ the results are materialized. This is most intuitively useful for computing windowed aggregates, and perhaps inner joins.
SELECT DeviceId, COUNT(*) FROM Input TIMESTAMP BY EventTime GROUP BY DeviceId, TumblingWindow(minute, 5)
Here in this case, full count can only be generated when high water mark of EventTime moves beyond the end of every 5 minute window, but partial count may be desirable to display on a dashboard as the count increases. This is especially relevant when events arrive out of order and in cases where count in multiple 5 minute windows can increase as late arriving events are accounted for.
For outer join where lack of match may have to be retracted, and analytic functions where event ordering is important for pattern matching, early output doesn’t provide as much value.
SELECT I.Id as Id
FROM Impression AS I TIMESTAMP BY ImpressionTime
LEFT OUTER JOIN Click AS C TIMESTAMP BY ClickTime
ON I.Id = C.Id
AND DATEDIFF(minute, I, C) BETWEEN 0 AND 60
WHERE C.Id IS NULL
This query outputs all ad impressions without clicks within a 60 minutes interval. Generating output before 60 minutes elapses from the ads impression may result in the need to retract the output when a click finally arrives. When composed with additional query processing downstream, the computation caused by such retraction can become overwhelming.
For complex windowed aggregates, e.g. sliding window, the same applies, when late arriving events can alter the results of multiple windows that contains the late arriving event. If we introduce a session window, a useful window type for certain scenarios, a late arriving event may combine two session windows into one. This is especially true for alerting scenarios, a primary use case for stream processing, where, in most cases, retracting an alert is much less desirable than waiting to make sure some pattern indeed has occurred and is worthy of an alert. Beyond a single step windowed aggregate query, composing such behavior with other downstream operators can also become intractable for user to understand. As a result, we have made the design choice to not expose the ability to generate partial results. Results are only generated when the high water mark of event time has elapsed beyond the closing of the window, or beyond all events that may contribute to the results.
The lack of a need to produce partial results presents an opportunity for us to use in-order-processing to compute the results, which allows users to specify a reorder window to wait for the late arriving events, and use the high water mark of the incoming events’ timestamp as a punctuation to move the rest of the processing pipeline forward. Events coming out of the reorder window are fully sorted by event time per partition, and subsequent operator implementations can assume that incoming events are already ordered by event time.
For example, in the above ad impressions and click matching example, user may anticipate event delays by up to 5 minutes. A reorder window of 5 minutes may be specified as a part of the query configuration. You can consider the reorder buffer as a priority queue. As the high water mark of incoming events advances, all events with timestamp older than the high water mark of 5 minutes are pushed out of the queue, ordered by timestamp, and then processed by the join operator. The join operator only needs to buffer 60 minutes of events. As time advances, it can drop events beyond 60 minutes old. The same logic happens to the windowed aggregate query shown above.
In contrast, processing events as they arrive, without reordering, also known as out-of-order processing, has the advantage in reducing buffered state size for windowed aggregates, because only the aggregated state need to be kept around. In the windowed aggregate query (first example above), if events can be out of order by up to 5 minutes, only two counters have to be kept around per device, one for the current 5 minutes, one for the previous 5 minutes. There is no need to buffer all incoming events in the 5 minute reorder buffer, a potentially large saving in memory if the event rate is high.
However, there is no memory advantage for joins and analytic functions because the raw events have to be buffered anyway. When multiple operators are composed together, such event buffering needs to happen at each operator. This consumes much more memory than the per input reorder buffer used by in-order-processing. CPU usage-wise, out-of-order-processing does have the advantage of amortizing the computation over time, because the computation is performed as events arrive, so the CPU usage is less spiky. However, the operator implementation is often much more complex (e.g. multiple counters have to be kept around in the windowed aggregate example above), which in turn increases overall CPU usage, and potentially prevents other optimization techniques from being combined (e.g. columnar processing, instruction cache and compiler optimization). The Trill paper https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/trill-vldb2015.pdf, describes many such techniques. ASA uses Trill as the on-node processing engine.
The drawback of in-order-processing is primarily in the potentially large reorder buffer when a large reorder window is specified. This happens most commonly for IoT scenarios wherein a large number of event senders may result in significantly divergent timelines because of the various network delays and clock skews. However, in such scenarios, the different timelines are logically separated in the query semantics. As a result, we have newly introduced the concept of substream query to allow the timelines to be decoupled, so the reorder buffer size for each timeline can be kept small, while the time gap between timelines can be large.
SELECT DeviceId, COUNT(*) FROM Input TIMESTAMP BY EventTime OVER DeviceId GROUP BY DeviceId, TumblingWindow(minute, 5)
This query counts events by device id. The devices can have clocks completely out of sync, but for a specific device the events are guaranteed to arrive mostly in order. We can then use a very small reorder window (e.g. a few seconds) to avoid incurring large memory usage for the reorder buffer, and delaying the output.
To conclude, the two design choices namely 1) Not producing partial results, and 2) In-order-processing allows on-node processing in Azure Stream Analytics to be performed very efficiently, with overall lower memory usage as a result of eliminating the need for buffering of events at every operator. In the future, there might be opportunities to combine both in-order and out-of-order processing techniques to achieve even higher memory and CPU efficiency where appropriate.