Mind your subscriptions in your IObservable extensions

When creating new Rx operators that extend IObservable, it’s a good best practice to make sure you only subscribe once to the source. Users of your operator have a right to expect that subscriptions to your operator will only in turn cause a single subscription to the source. This could become crucial if the source is cold – multiple subscriptions might cause unintended side-effects.

Here is an example (somewhat contrived for simplicity). Let’s say you wanted to have a running count of the number of OnNext()s that have occurred in a source stream within the previous 5 seconds. You might write this as follows:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            return Observable
              .Merge(source.Select(_ => onNextEntersWindow),
                     source.Select(_ => onNextExitsWindow)
                           .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

This code is projecting the source stream to TWO separate streams and merging them. The first is a stream of 1′s, the second a stream of -1′s that is delayed by an interval. These are then added together with Scan for a real time running total. The effect of this is for each event in the source, when get a count of the number of events that have occurred within the preceding interval. This is a useful idea for a number of advanced operators… but I digress.

The key point is that this operation creates TWO subscriptions to the source – one for each stream. The fix is straightforward – you simply publish the source and subscribe to the publication as many times as you need. Effectively you are working with a version of the source stream guaranteed to be hot. The fix looks like this:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            var hotSource = source.Publish().RefCount();

            return Observable
              .Merge(hotSource.Select(_ => onNextEntersWindow),
                     hotSource.Select(_ => onNextExitsWindow)
                              .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

You must be accurate with the Publish. Putting it before the Create would cause a single Subscription across ALL subscriptions to a given invocation. By doing the Publish inside the Create, we get the desired behaviour of one Subscription to source per invocation.

One thought on “Mind your subscriptions in your IObservable extensions

  1. Pingback: A reactive join example | Zero Bug Build