When creating new Rx operators that extend IObservable
Here is an example (somewhat contrived for simplicity). Let’s say you wanted to have a running count of the number of OnNext()s that have occurred in a source stream within the previous 5 seconds. You might write this as follows:
public static IObservable<int> WindowCount<T>(
this IObservable<T> source,
TimeSpan interval)
{
const int onNextEntersWindow = 1;
const int onNextExitsWindow = -1;
return Observable.Create<int>(observer =>
{
return Observable
.Merge(source.Select(_ => onNextEntersWindow),
source.Select(_ => onNextExitsWindow)
.Delay(interval))
.Scan((countInWindow, onNextEvent) =>
countInWindow + onNextEvent)
.Subscribe(observer);
});
}
This code is projecting the source stream to TWO separate streams and merging them. The first is a stream of 1′s, the second a stream of -1′s that is delayed by an interval. These are then added together with Scan for a real time running total. The effect of this is for each event in the source, when get a count of the number of events that have occurred within the preceding interval. This is a useful idea for a number of advanced operators… but I digress.
The key point is that this operation creates TWO subscriptions to the source – one for each stream. The fix is straightforward – you simply publish the source and subscribe to the publication as many times as you need. Effectively you are working with a version of the source stream guaranteed to be hot. The fix looks like this:
public static IObservable<int> WindowCount<T>(
this IObservable<T> source,
TimeSpan interval)
{
const int onNextEntersWindow = 1;
const int onNextExitsWindow = -1;
return Observable.Create<int>(observer =>
{
var hotSource = source.Publish().RefCount();
return Observable
.Merge(hotSource.Select(_ => onNextEntersWindow),
hotSource.Select(_ => onNextExitsWindow)
.Delay(interval))
.Scan((countInWindow, onNextEvent) =>
countInWindow + onNextEvent)
.Subscribe(observer);
});
}
You must be accurate with the Publish. Putting it before the Create would cause a single Subscription across ALL subscriptions to a given invocation. By doing the Publish inside the Create, we get the desired behaviour of one Subscription to source per invocation.