Today we’ll look at a fairly common server-side scenario. In this case, a server installs clients on remote machines and clients report back through a socket.
We want to create a “hold-timer” – each client will periodically send a heartbeat with a unique id back to the server to indicate they are alive. If the server doesn’t receive a heartbeat from a client within a specified limit then the client is removed.
Here’s a simple snippet that demonstrates how you can do this with reactive extensions. If you want to run it, paste the code into a console app – and don’t forget to add Nuget package Rx-Main :
var clientHeartbeats = new Subject<int>();
var timeToHold = TimeSpan.FromSeconds(5);
var expiredClients = clientHeartbeats
.Sychronize()
.GroupBy(key => key)
.SelectMany(grp => grp.Throttle(timeToHold));
var subscription = expiredClients.Subscribe(
// in here add your disconnect action
i => Console.WriteLine("Disconnect Client: " + i));
while(true)
{
var num = Console.ReadLine();
if (num == "q")
{
break;
}
// call OnNext with the client id each time they send hello
clientHeartbeats.OnNext(int.Parse(num));
}
// to tear down the monitor
subscription.Dispose();
So let’s break this down. We will create a Subject to which the client heartbeats will be posted. Every time a client heartbeat arrives, we call OnNext on the clientHeartbeats Subject, passing the client id. This gives us an IObservable<int> stream of client ids.
Let’s gloss over the Synchronize() call for now – I’ll come back to this later.
The first thing we want to do is separate out each client’s events into seperate streams for independent consideration. We can use Observable.GroupBy to do this. GroupBy needs a key selector which will identify each group. In this case we can use the identity function (key => key) and group by the id itself.
The output of GroupBy is a stream of streams – each stream emitted is an IGroupedObservable<int> that is just like an IObservable<int> but that also has a Key property to indicate which group it is. So the return type of GroupBy is an eye-watering IObservable<IGroupedObservable<TSource, TKey>>.
So now we have individual streams for each client id to work with. Let’s consider the goal – we want to emit a signal whenever a client doesn’t notify us for a specified duration.
Observable.Throttle is what we need. This extension method will suppress an event if another event appears within a specified duration. Perfect! We apply a Throttle to each group and we will only get an event if a client doesn’t say hello in time.
The situation so far is illustrated below:
Now we need to pull all the separate streams back into one result stream. For this, we can use Observable.SelectMany. One of its many (no pun intended!) overloads will do this for us, accepting a stream of streams and using a function pull out elements to be sent to the result stream. The selector function takes each group stream and we can apply the Throttle behaviour described above.
Now, there’s a problem here. The GroupBy operator has a memory leak! As written, it’s groups will never terminate. Even worse, if you try to terminate a group subscription (with say a Take(1) after our throttle), then the group will end, but won’t ever restart! I wouldn’t get too carried away worrying about this unless you have very large numbers of clients and never restart your service but…
You can address this problem by using Observable.GroupByUntil which takes a duration function for each group, and allows groups to restart. Here’s the code – I’ll leave it as an exercise for you to figure out the detail of how it works and why I’ve written it this way. :
var expiredClients = clientHeartbeats
.GroupByUntil(key => key, grp => grp.Throttle(timeToHold))
.SelectMany(grp => grp.TakeLast(1));
One final note… about that Observable.Synchronize. This function ensures that the IObservable<T> it is applied to behaves itself! It enforces the Rx grammar of OnNext*(OnComplete|OnError)? and it makes OnNext calls thread safe. In Rx 2.x, although Subject enforces correct grammar, OnNext calls are not thread safe. This was a decision made for performance reasons. It’s quite possible that in production server code you will be wanting to call OnNext from different threads. Synchronize will help prevent any nasty race conditions by putting a gate around OnNext calls.
Hello !
Really nice, something I have to understand…. [RX is too complicated to mee ].
In your exit decision [after "q"], I just inserted this:
clientHeartbeats.OnCompleted();
and in the subscription registration, I added a handler for OnComplete. This works, for the first view.
But:After I enter “q” and the timeout was NOT reached,
I am getting “disconnected” messages for each client.
I feel bad with this and I do not understand why.
Would be nice, if this can be described.
Thanks anyway and
best regards,
++mabra
There isn’t really a meaningful use for OnCompleted in this scenario – it’s designed as a non-terminating stream. In this sample application the while loop is just there to keep things running and feed the query for demo purposes; in a production situation you would create the subscription in start-up code an then just let it run, and make the OnNext calls from whatever service code is handling client calls (e.g. perhaps some web service logic). The Disposable of the subscription is all you need to stop monitoring clients – and it’s not even essential to call that unless you want to stop monitoring separately from shutting down your application.
Pretty well explained, great work over there! You could perhaps make a second part explaining your thoughs on how to handle the lifecycle of the heartbeat itself in the client side. Keep it up!