I wrote some helper extension methods to log what’s going on in my Rx expressions. These leverage the Materialize method which has this signature:
public static IObservable<Notification<TSource>>
Materialize<TSource>(this IObservable<TSource> source)
Materialize projects a stream of values of T into a stream of Notification<T> values. Notifications have a Kind that tells us whether the original value was an OnNext, OnError or OnCompleted. They give you access to the original Value or Exception as appropriate. You can also pass a Notification on to an observer via its Accept method.
To put it another way, (the way the docs do), it turns implicit notifications into explicit notifications. Implicit because the method called on the observer (OnNext, OnError, OnCompleted) tells us implicitly what kind of notification it is, whereas the Notifications, which are always delivered into OnNext, are explicitly declaring the type via their Kind property.
Incidentally, there is a corresponding Dematerialize method that performs the reverse operation. Finally, Materialized streams send OnCompleted after they send a Notification of Kind OnError or OnCompleted. They don’t OnError themselves.
So why is this useful?
Well if you ever want to process all events on a stream in a uniform manner, without Materialize you’d be stuck writing handlers for each of the three methods. With Materialize, you can do it all in one place.
Here are some practical examples I use when I want to inspect what’s going on with my Rx. The first example is a side effect, like Do(), that can be injected into a stream to write out what’s happening to the Console. It’s an extension method:
public static IObservable<T> LogToConsole<T>(
this IObservable<T> source, string key = null)
{
return Observable.Create<T>(observer =>
source.Materialize().Subscribe(notification =>
{
Console.WriteLine(NotifyToString(notification, key));
notification.Accept(observer);
}));
}
This uses Materialize to get a stream of Notifications which can then be written out to the Console using the helper method NotifyToString, shown later. I use the Accept method to pass the notification on to the observer. The key property optionally allows me to label the output. Here’s an example:
Observable.Range(0, 4).LogToConsole("Range: ").Subscribe();
Which gives the output:
Range: OnNext(0)
Range: OnNext(1)
Range: OnNext(2)
Range: OnNext(3)
Range: OnCompleted()
Note that LogToConsole subscribes “on demand” (as all good Rx operators should) and needs a subscriber itself for anything to happen. It’s a side-effect, so be warned it log for EVERY subscriber.
It’s quite common for me to be experimenting with the output of operators in tools like LINQPad, so I also wrote another extension method to take care of the subscription as well:
public static IDisposable SubscribeConsoleNotifier<T>(
this IObservable<T> source, string key = null)
{
return source.Materialize().Subscribe(
n => Console.WriteLine(NotifyToString(n, key)));
}
And an example of usage putting both together:
Observable.Range(0, 4)
.LogToConsole("Range: ")
.Where(i => i % 2 == 0)
.SubscribeConsoleNotifier("Where: ");
Gives this, notice how the LogToConsole shows the output from Range before the Where filter is applied:
Range: OnNext(0)
Where: OnNext(0)
Range: OnNext(1)
Range: OnNext(2)
Where: OnNext(2)
Range: OnNext(3)
Range: OnCompleted()
Where: OnCompleted()
Obviously, you can code these to write to a logger, or something more generally useful.
To wrap this post up, here’s the implementation of the helper NotifyToConsole:
public static string NotifyToString<T>(
Notification<T> notification, string key = null)
{
if (key != null)
return key + "\t" + notification;
return notification.ToString();
}
It’s all trivial stuff, but I’ve found it quite useful.