On your Next action, please follow these guidelines

I recently got around to reading the Rx Design Guidelines. This is a fairly small document that outlines advice for using the Rx library.

The early documents on Rx didn’t seem to offer much advice on the scheduling of the actions that are to be carried out when the methods of IObservable are called. When subscribing to a stream of events, the context is often important if you want to interact with thread bound items such as Windows controls.

This document and the later releases of Rx cover the concurrency aspect in much more detail. There is now an IScheduler interface that allows the user to control when and in which context the actions are executed (in addition to the ObserveOn and Synchronize methods), and the user has a fair amount of control over the scheduling using one of many pre-defined Schedulers. There is also a Scheduler that uses a concept of virtual time, which makes it easy to write unit tests where events happen at particular instants.

I was also interested to see that a form of the Join calculus is supported by the Rx library. We can use Subject objects to represent the channels of the Join calculus and can then define patterns that look very much the usual Counter example.

var counter = new Subject<int>();
var inc = new Subject<int>();
var get = new Subject<IObserver<int>>();

var channelObserver =
    Observable.Join(
        counter.And(inc).Then((current, result) =>
        {
            counter.OnNext(current+result);
            return new Unit();
        }),
        counter.And(get).Then((current, channel) =>
            {
                channel.OnNext(current);
                counter.OnNext(current);
                return new Unit();
            }));

This observer needs to have a subscription to make it active, and then we can send it the various messages as in the standard example. If we were doing this for real, we would, of course, need to dispose the observers at the relevant points.

channelObserver.Subscribe();
counter.OnNext(5);
inc.OnNext(3);
inc.OnNext(5);

var counterValue = new Subject<int>();
counterValue.Subscribe(x => Console.WriteLine(x));
get.OnNext(counterValue);

inc.OnNext(3);
inc.OnNext(5);
get.OnNext(counterValue);

It’s quite exciting to see how rapidly this library is developing. The concurrency side of things is really different from the FRP (Functional Reactive Programming) examples of old, where everything was single threaded and hence context safe, but the concurrency side of things is being worked into the Rx library. This library is certainly worth watching as it develops in the future.

Advertisements
This entry was posted in Computers and Internet. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s