Rx Christmas Release 2010 – Introducing Join and GroupJoin operators (and more)


Build 1.0.2838.0 12/24/2010 – Rx Christmas Release 2010

Rx.NET

  • New: support for joins in Rx. This includes two new operators, Join and GroupJoin, which have query expression support too:
  • This work will be covered on Channel 9 in much more detail. We’ll update this post with a link to the video when it’s online. Here’s the sketch of this new functionality.
  • Depending on the use of “into”, either Join or GroupJoin is being called in the following sample (see language specification for more details on the translation into query operator methods):

       from x in xs
       join y in ys on f(x) equals g(y) [into z]
       …

    In the sample above, xs and ys are observable sequences. Furthermore, f(x) and g(y) are functions that return an observable sequence that denotes a duration. Results are produced when those observable sequences overlap. For example, one could monitor people entering and leaving a room and create a joined sequence that produces pairs of people that are in the same room at the same time.

  • Added a Window operator that uses joins underneath.
  • Reimplemented BufferWithTime, BufferWithCount, BufferWithTimeOrCount in terms of those new join and windowing operators.
  • Generalized Publish family of operators (Publish, Prune, Replay) to a single Publish operator that takes in a Subject. Sharing a subscription to an observable sequence can be seen as “shadowing” the observable sequence using a Subject that can enforce different sharing policies (e.g. maintain history or not).
  • Related to this work is the addition of a Multicast operator, which encapsulates a ConnectableObservable.
  • ConnectableObservable is now parameterized in two generic type parameters, reflecting the most general ISubject.
  • Added an overload to If that doesn’t take an “else” case observable sequence (defaulting to Observable.Empty).
  • Fixes:
  • DispatcherScheduler is now maintained on a per-thread basis using ThreadStatic.
  • Refined implementation of Switch.
  • Variance annotations on IAsyncEnumerable and IAsyncEnumerator for .NET 4.

RxJS

  • Fix for scheduling in the Concat, Catch, and OnError operators.
  • If operator can now be used without supplying an “else” case.

Ix.NET

  • Fix to EnumerableEx.Scan.

Download the different release flavors here:

Comments (8)

  1. Joel says:

    Sometime between this release and the version I had been using, BufferWithTime changed from returning an IObservable<IList<T>> to returning an IObservable<IObservable<T>>. Now when I chain a Subscribe after BufferWithTime it seems to never be called. All I want are the values that came through during the time period in question. I assume I can do a ToEnumerable() on the inner IObservable, but again, my code never seems to get called. Any idea what I'm doing wrong? Some documentation on exactly what changed with this method and why would be nice, but I haven't found any at all. Guess I'm rolling back to the old version of the library for now…

  2. Caleb says:

    I have the same problem as Joel.

    this thread indicates that it should be fixed in the upcoming release: social.msdn.microsoft.com/…/5c45ae18-d195-4bfc-8ac1-3182d5883e8b

    However that post was made before this release so I am not sure what is going on with that.

  3. Caleb says:

    I have the same problem as Joel.

    this thread indicates that it should be fixed in the upcoming release: social.msdn.microsoft.com/…/5c45ae18-d195-4bfc-8ac1-3182d5883e8b

    However that post was made before this release so I am not sure what is going on with that.

  4. Wayne Bradney says:

    What happened to IObservable<T>.Replay() in this latest release?

  5. Joel says:

    @Caleb – According to this thread, a fix is expected to be released around the end of the first week in February. Although a response on the blog would have been nice…

    social.msdn.microsoft.com/…/5f43b338-753d-4142-afd1-dd48d271a315

  6. Yves Schelpe says:

    I heard the new Rx would come today? (removing System.CoreEx etc..) Excited about it!

  7. Fyodor Soikin says:

    The version of Publish() that takes just one argument (homogeneously-typed subject) doesn't implement it's supposed semantics. I've looked into the code (with some help from ILSpy), and it turns out that this version of Publish simply subscribes the subject to the source RIGHT AWAY, and then returns subject.AsObservable().

    This has a nasty effect of draining any replayed values (if the source has replay semantics) through the subject at the moment of calling Publish, as opposed to the moment of subscribing to it's result, as it's supposed to be.

    In contrast, the other overload of Publish works just as expected, only subscribing to the source once a subscription comes in from downstream.

    To work around this, I had to replace all calls to "x.Publish(y)" with "x.Publish( y, z => z )". This takes care of the problem, but it cost me quite some time to find out what was wrong.

    Please fix it in the next release.

  8. We will definitely fix that for the next release.