Here’s an interesting problem that came up on the RX forums. How do you compare the current and previous items in an observable stream?
We can create a general purpose method to pair up the current and previous items, which will enable you to easily tack on whatever comparison function you like, such as a Where or Select. Here’s a picture of what we want to do:
Observable.Scan gives us an excellent starting point for this.
We’ll call the extension PairWithPrevious. Here’s an example of how you might use it to write out the deltas in a stream of integers:
var source = new Subject<int>();
var delta = source.PairWithPrevious()
.Select(pair => pair.Item2 - pair.Item1)
.Subscribe(Console.WriteLine);
source.OnNext(1);
source.OnNext(5);
source.OnNext(3);
This gives the following output (note the first delta value is from the default integer value of 0 to the initial value of 1):
1
4
-2
Here’s the code:
public static IObservable<Tuple<TSource, TSource>>
PairWithPrevious<TSource>(this IObservable<TSource> source)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(acc, current) => Tuple.Create(acc.Item2, current));
}
The first argument to scan specifies a “seed” that initializes the “accumulator”. This is not returned – it gives an initial value for the accumulator function. As each item arrives, the accumulator is passed to an accumulator function along with the current item. The function produces the next value for the accumulator. This new accumulator value is output to observers.
So Observable.Scan is similar to Observable.Aggregate, except it emits the value of the aggregate as it goes instead of only when the source stream completes.
I’m using the built in Tuple type here to produce the resulting pairs – this has an advantage over anonymous types because you can return them from a method. If you have specific needs, you can consider a custom type. Tuple provides a way of grouping values and has a set of ItemX properties, one for each value called Item1, Item2 and so on.
The seed value is a Tuple pair of defaults for the source type. On each iteration, we create a new Tuple, copying the old Item2 to Item1, and setting Item2 with the current value. We create a new Tuple because Tuples are immutable – if you are using a custom type, you could update it – but I actually think immutability is a safer way to go in RX in general, unless you are especially worried about memory and GC performance.
Finally, here’s a unit test I wrote for this function – I used Nuget packages Rx-Testing and NUnit:
public class PairWithPreviousTests : ReactiveTest
{
[Test]
public void Works()
{
var testScheduler = new TestScheduler();
var source = Observable.Range(1, 3);
var results = testScheduler.Start(
() => source.PairWithPrevious());
results.Messages.AssertEqual(
OnNext(Subscribed, Tuple.Create(0, 1)),
OnNext(Subscribed, Tuple.Create(1, 2)),
OnNext(Subscribed, Tuple.Create(2, 3)),
OnCompleted<Tuple<int, int>>(Subscribed));
}
}
This post was inspired in part by an old question on StackOverflow with an answer that suffers from a double subscription to the source. If you like the approach here, I’d be dead pleased if you could up-vote my answer on SO. Thanks!
I liked your extension but I think you could make the signature better. if you make the signature public static IObservable WithPrevious(this IObservable source, Func projection)
Then you can hide the tuple. You just need to change the code to be
return source.Scan(new Tuple(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => projection(t.Item1, t.Item2));
Now there is no tuple exposed and the projection is handled. It makes using it a lot nicer were you just pass in a Func that takes the previous and current and does whatever it needs to do.
But this is a real good solution to use scan.
Nice idea. I never liked exposing Tuple but wanted to keep the concept count low for the post. This is a good compromise between flexibility and ease of use, I like it.