A reactive join example

A good example for using a reactive join came up the other day. Given a realtime price stream, we want to display the most recent price whenever a button is clicked.

Joins in Rx ask when it is that events from pairs of streams coincide.

To reason about whether events occur at the same time, they need to have a duration as well as a value. That duration could be infinitessimally small (a point event), or it could be longer.

Duration in Rx is expressed with (you might have guessed) an observable. Each event in a source stream has an associated duration stream. The duration of the event is the time it takes for the duration stream to emit an OnNext or OnCompleted.

So back to the example. The prices are represented as an IObservable<Price>.

Through the use of Observable.FromEvent we can easily transform click events into an IObservable<Unit>. (Recall Unit is a type representing an event whose value is not interesting.)

The result we want will be an IObservable<Price> that emits the most recent Price from the price stream whenever the IObservable<Unit> stream emits a Unit.

The join should tell us when the duration of a price coincides with a click. Here’s a visual representation of the idea:

Visualizing SampleOn

This is quite similar to the built in Rx Sample extension method that samples a source stream at periodic intervals, so I decided to call this operation SampleOn.

To use a join here we want to give each Price a duration that lasts until the next Price is emitted. A duration that lasts until the next event is quite a common requirement. Happily, we can use a useful trick and take the Price stream itself as the duration observable. In this way, the next Price will terminate the duration of the previous Price.*

The duration of the Click event is a point event – to represent this we just a stream that terminates immediately, and Observable.Empty<Unit>() will do nicely.

Here is the code:

public static IObservable<TSource>
    SampleOn<TSource, TSample>(
        this IObservable<TSource> source,
        IObservable<TSample> sampler)
        {
            return from s in source
                   join t in sampler
                     on source equals Observable.Empty<Unit>()
                   select s;
        }

The query syntax for joins uses the following pattern:

from left event in left stream
join right event in right stream
on left duration function equals right duration function
select selector function.

* One thing to point out when using a stream as it’s own duration function, as I do in this example, is that this is going to cause the stream to be subscribed to multiple times – you will want to either Publish()/RefCount() the source stream or ensure that it is hot. I talked about this previously here.

Comments are closed.