A few English-to-Rx examples

Thanks to Erik Meijer for teaching me some of these patterns and reviewing the post. Thanks to Mark Simms and Curt Peterson for comments and reviews.

Mark Simms and I have decided to start a new habit: If we find ourselves answering a similar question thrice, we'll blog about it. Without further ado, here are a few "I'd like to compute" statements expressed as Rx queries. They are a subset from a (now old) post. If you find yourself in the presence of data being pushed to you (instead of you pulling for data), you'll find all the tools you need to process it in .NET. Here's a brief overview of Observable Sequences, and simple patterns that illustrate how we can use Rx to express computations over observable sequences.

Observable Sequences

A sequence of data can be pulled or pushed. Pull-based sequences are very familiar to us because of the years of patterns we've been using: (1) Go read a fixed amount of data from a database, (2) Create a list, (3) Iterate over a collection, etc. In .NET, the IEnumerable<T> interface represents a pull-based sequence. With a push-based sequence, one does not usually control access to the data. Instead of iterating over it, we observe it. In .NET, the IObservable<T> interface represents this notion. Observable sequences are the dual of Enumerable sequences (check out a great explanation by the Rx team). I'd like to share with you a few common patterns to create observable sequences.

Creating observable sequences

Sometimes even when a source can be cast as an enumerable, we may want to iterate over it and observe it as we read it. Notice the use of ReadLineAsync so as to not block processing in what is commonly a slow I/O operation. Also notice how clean it is in C# 5.0 to make the lambda expression in Observable.Create() asynchronous, by just writing async (observer, c) and  await  in the asynchronous call (More on Rx and .NET 4.5 here):

    public static IObservable<string> FromFile(string fileName)
    {
        return Observable.Create<string>(async (observer, c) =>
        {
            using (var sr = new StreamReader(path: fileName))
            {
                var line = default(string);

try
                {
                    while ((line = await sr.ReadLineAsync()) != null
                            && !c.IsCancellationRequested )
                    {
                        observer.OnNext(line);
                    }
                    observer.OnCompleted();
                }
                catch (Exception e)
                {
                    observer.OnError(e);
                }
            }
        });
    }

 
Another common need: periodically polling a data source or periodically producing new data. We can use Observable.Timer() and compose on top of the resulting sequence to create a new sequence:

    public static IObservable<string> PeriodicProduction(TimeSpan startTime,
                                                         TimeSpan period)
    {
        return Observable.Timer(dueTime: startTime, period: period)
               .Select(x => String.Format("A new ({0}) string!", x));
    }    

You can imagine the timer-based pattern doing other interesting things, such as polling from a queue.

Merging observable sequences

The Merge operator performs a union of observable sequences and produces a new one. The sequences must be union-compatible, i.e., they must be of the same type. A running example: suppose we want to simulate an observable sequence (based on timer) of one or more sensors reporting a particular metric periodically. Let's use the following schema:

    public class SensorData
    {
        public DateTime StartTime { get; set; }
        public int SensorId { get; set; }
        public long Value { get; set; }

        public override string ToString()
        {
            return String.Format("StartTime: {0}, SensorId: {1}, Value: {2}"
, StartTime, SensorId, Value);
        }
    }

One way to produce the desired sequence is to create a method that returns an observable sequence for a single sensor and merge an array of individual sensor sequences. You can imagine this maps to sensor networks: each sensor produces a sequence, but we really want to query over a sequence of observations from all sensors:

    public static IObservable<SensorData> CreateSensor(TimeSpan reportingFrequency
, int sensorId)
    {
        return Observable.Timer(TimeSpan.Zero, reportingFrequency)
               .Select(x => new SensorData()
               {
                   StartTime = DateTime.Now
                             , SensorId = sensorId
                             , Value = (new Random(DateTime.Now.Millisecond
                                                   + sensorId)).Next(0, 100)
               });
    }

    var individualSensors = new[]
    {
        Helpers.CreateSensor(TimeSpan.FromSeconds(1), 1000),
        Helpers.CreateSensor(TimeSpan.FromSeconds(1), 1001),
        Helpers.CreateSensor(TimeSpan.FromSeconds(1), 1002)
    };

    var sensors = individualSensors.Merge();  

Expressing multiple queries over an observable sequence

We are now almost ready to get dangerous. I would like to express many queries over the same sequence (without evaluating the generating function for each query). The .Publish() method on observables produces a ConnectableObservable, which is a type of Subject (see Subjects). A ConnectableObservable is cold, which means it is not producing data until we ask it to via the .Connect() method (then it's hot). This is a very useful construct, since essentially we have a one-to-many pipeline with the generating expression being evaluated only once. Put another way, if you have an observable and run two queries over it, you have two copies of the sequence being generated. If you write two queries over a ConnectableObservable, once you invoke .Connect() on the latter, there is only one copy of the underlying sequence. Sharing is good J

Let's put it all together and create an observable sequence of sensor readings and a connectable observable:

    var sensors = individualSensors.Merge().Publish();

We are now ready to write some queries. We'll do Filters and Windowed aggregates next, and when we're ready to start the data flow, we'll write

    sensors.Connect();

Filters

How do I filter a stream, keeping only specific events?

     var q1 = from e in sensors
              where e.Value > 20
              select e;

How do I filter on multiple conditions?

     var q2 = from e in sensors
              where (e.SensorId == 1001) && (e.Value > 70)
              select e;

Windows and aggregations

How can I calculate the average of all events in the last 5 seconds?

      var q3 = from w in sensors.Window(timeSpan: TimeSpan.FromSeconds(5))
              from a in w.Average(_e => _e.Value)
              select new { Time = DateTime.Now, Average = a };

 

How can I calculate the average of all events in the past 5 seconds, every 2 seconds?

      var q4 = from w in sensors.Window(timeSpan: TimeSpan.FromSeconds(5)
, timeShift: TimeSpan.FromSeconds(2))
              from a in w.Average(_e => _e.Value)
              select new { Time = DateTime.Now, Average = a };

 

How can I calculate the average of the past 5 events?

      var q5 = from w in sensors.Window(count: 5)
              from a in w.Average(_e => _e.Value)
              select new { Time = DateTime.Now, Average = a };

 

How do I measure the rate of arriving events every two seconds?

      var q6 = from w in sensors.Window(timeSpan: TimeSpan.FromSeconds(2))
              from a in w.Count()
              select new { Time = DateTime.Now, Count = a };

 

How can I calculate the average of each group of events in the last 10 seconds?

      var q7 = from e in sensors
              group e by e.SensorId into g
              from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(10))
              select new { Time = DateTime.Now
, SensorId = g.Key
, Average = w.Average(_e => _e.Value)};

 

How can I calculate the average of each group of events in the last 20 seconds, every 10 seconds?

      var q8 = from e in sensors
              group e by e.SensorId into g
              from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(20)
, timeShift: TimeSpan.FromSeconds(10))
              select new { Time = DateTime.Now
                         , SensorId = g.Key
, Average = w.Average(_e => _e.Value) };

 

How can I find the events with the largest value every 10 seconds?

      var q9 = from w in sensors.Window(timeSpan: TimeSpan.FromSeconds(10))
              from a in w.Max(e => e.Value)
              select new { Time = DateTime.Now, Max = a };

 

How can I find the events with the two smallest values in the last 10 seconds, every 5 seconds?

      var q10 = from w in sensors.Buffer(timeSpan: TimeSpan.FromSeconds(10)
, timeShift: TimeSpan.FromSeconds(5))
               from a in w.OrderBy(x => x.Value).Take(2)
               select a;

 

How can I detect if two events with the same ID value are received within a specific time frame, such as a 5 minute window?

      var q11 = from e in sensors
               group e by e.SensorId into g
               from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(5))
               where w.Count > 2
               select new { Time = DateTime.Now, SensorId = g.Key };

 

Final remarks

At this point, assuming we have subscribed an observer to each query, all we need to is sensors.Connect() and watch our Rx processing come to life. If you're hacking or prototyping and want a quick way to visualize the output, try the .DumpLive() method in LINQPad. Hope these patterns are useful – and let me know if there are other examples you'd like to see. And remember, StreamInsight 2.1. has a tight integration with Rx. Sometimes you want to process observable sequences, other times you want to work over temporal streams (CTIs).

Bonus: The queries and patterns in a self-contained LINQPad file.