Let’s say there are several agents – e.g. devices – producing temporal streams. It may be interesting to merge these sequences into a single stream that can be processed by StreamInsight. The “union” operator allows you to merge a fixed number of inputs, but what happens when inputs come and go over time? Some policy is needed that can make sense of these dynamic inputs since they may disagree on how time is progressing. A “correct” policy that doesn’t propagate a CTI that might later be violated is no good: some new input can always come on line with an earlier CTI value so such a policy doesn’t allow any CTIs through. A “loose” policy that propagates all CTIs is useless because it assumes that all inputs are progressing in lockstep – implausible given network latency, clock drift, etc.
Temporal stream: a sequence of timestamped events. An event may be a Current Time Increment (CTI) event. A CTI is a promise that no subsequent events will have timestamps lower than that of the CTI.
A possible Goldilocks policy: allow for some maximum deviation between the most and least advanced input (let’s call this timespan the delay). Whenever a CTI event from any input is processed – and whenever that CTI is greater than any seen so far – subtract the delay so that the CTI doesn’t need to disqualify events from a less advanced input. If after delaying the CTI an incoming event still violates the CTI, drop or adjust the event.
The ISubject<,> contract allows us to encapsulate the policy described above. Multiple producers can feed the subject (via calls to On*). Each producer can come and go at its own pace. How can we implement such a subject? Let’s start with a helper factory method that allows us to create a “transform subject”. This subject applies an arbitrary function to an input observable, where that function may represent a stateful computation.
Notice that this subject encapsulates two other subjects. One represents the input, a plain old Subject<> that allows input events to be passed to the transform logic. The other represents the output, a connectable observable that allows multiple consumers to read the output of the transform. When the subject is disposed, both the input subject and the output connection are released. A simple example of a transform subject that adds one to every incoming integer:
TransformSubject.Create((IObservable<int> xs) => xs.Select(x => x + 1))
CTI Synchronizing Subject (Round 1)
We can create a “CTI synchronizing” subject by constructing a transform subject where the transform logic tracks the highest CTI seen so far and modifies CTIs and drops events as needed:
In case you haven’t encountered them before, a couple of Rx operators are worth calling out. First, we apply the Synchronize operator which serializes all incoming events – allows us to avoid any race conditions due to inputs running on different threads. Second, the Scan operator allows us to define an accumulator that tracks the highest CTI seen so far. If an event has a timestamp that precedes the highest CTI seen so far, it must be dropped to avoid a CTI violation.
Let’s look at how the subject behaves given interleaved events from two simulated input sources:
The following output is produced:
INPUT Insert: input A, time 5, payload 1
OUTPUT Insert: time 5, payload 1
INPUT Insert: input B, time 0, payload 2
OUTPUT Insert: time 0, payload 2
INPUT CTI: input A, time 5
OUTPUT CTI: time 0
INPUT CTI: input B, time 0
INPUT Insert: input A, time 7, payload 3
OUTPUT Insert: time 7, payload 3
INPUT CTI: input A, time 8
OUTPUT CTI: time 3
INPUT Insert: input B, time 1, payload 4
CTI Synchronizing Subject (Round 2)
A different implementation of the subject that is written in a more imperative style now. Instead of using the Scan operator to maintain state, a custom accumulator class is used: