Reactive extensions in action

Rx.NET In Action by Tamir Dresher

The reactive extensions have been around for a long time. I remember coming across them in C# something like a decade ago, but I don’t think I’ve seen a book or documentation that covers the whole of the implementation – sure, people spend a lot of time talking about the various combinators and hot and cold observables, but they don’t spend much time talking about schedulers and the threading model that sits below the system.

Part one of the book, which consists of three chapters, gives a basic introduction to Reactive programming, and also covers some of the C# you need to make use of the Rx libraries.

By way of some examples, the first chapter introduces us to the idea of making events a first class object,  the IObservable interface and its duality to the IEnumerable, and points out the differences between the push and pull models of event delivery. The author goes on to look at the properties described in the Reactive Manifesto. We are also introduced to marble diagrams, which allow us to visualise the various interactions.

Chapter two takes us through a “Hello, Rx” application. This time it isn’t Google suggest, which used to be the canonical example that is used in various write ups. In this book we look at a stock tracker application. This allows the author to cover how standard classes of .NET events can be converted easily into event streams, and the author gets a chance to talk a little about the threading concerns. I think that’s great, as threading is often hidden under the covers in tutorials, but as soon as you want the events to be processed by a GUI you get into the GUI library’s threading requirements.

Chapter three covers functional thinking in C#. Rx.NET encourages you to handle a pipeline for processing, with events feeding into the top of the pipeline, various filtering and processing happening in the middle, and then elements subscribing to the resulting output of the pipleine. This is a mechanism that the functional style handles very well.

The second part of the book has chapters on various Rx.NET concepts.

Chapter four starts with creating observables, which is demonstrated by writing an observer that logs the received events to the console (and we’ll use this observer through the book). Of course writing things yourself gives you a chance to break the protocol of the IObservable – in particular the protocol that the messages flow as

(OnNext) * (OnError | OnCompleted)

It is therefore often better to use the Rx library’s helpers for defining your own classes, so the author points to the ObservableBase class which makes it easy for you to define your own named types, or better still there are many overloads on Observable.Create to avoid the need to name a new type.

            var ob = Observable.Create(observer => 
            {
                Console.WriteLine("Started");
                Task.Run(() => { Task.Delay(TimeSpan.FromSeconds(2)); observer.OnNext(2); observer.OnCompleted(); });
                return () => { Console.WriteLine("Finished"); };
            });

            ob.Subscribe(x => Console.WriteLine(x));

This chapter also looks at converting the various .NET event styles to observables, and looks at converting from Enumerable to Observable and back again. We also see some of the more primitive observables that handle looping and single values.

            var evensBelow50 = Observable.Generate(0, x => x  state + 2, v => v);
            var singleValue = Observable.Return(10);
            var neverFinish = Observable.Never();
            var empty = Observable.Empty();
            var _ = Observable.Throw(new Exception("Bang"));

Chapter five covers how you make observables from asynchronous code. It starts with looking at async friendly versions of Observable.Create

            var ob = Observable.Create(async (observer, ct) => 
            {
                Console.WriteLine("Started");
                ct.Register(() => Console.WriteLine("Finished"));
                await Task.Delay(TimeSpan.FromSeconds(2));
                ct.ThrowIfCancellationRequested();
                observer.OnNext(2);
                ct.ThrowIfCancellationRequested();
                observer.OnCompleted(); 
            });

And then looks at the conversions between Task and Observable handled by the ToObservable method, and how SelectMany and Concat can be used to link different computations together.

Chapter six looks at the observer/observable relationship, in particular how to delay and re-subscribe to the observable. We walk through the DelaySubscription method and various other operators like SkipWhile and TakeUntil.

            var ob = Observable.Range(1, 5).Do(x => Console.WriteLine(x));

Some of the ideas are put together in a drawing application where the code tracks the mouse and the mouse button up and down lead to event streams starting and ending.

Chapter seven looks at controlling the temperature of observables. Observables can be categorised as hot or cold. Here cold means that the observable replays a set of event to each subscriber, whereas a hot observable only plays new events. In the case of the hot observable, if you weren’t subscribed when the event happened, then you don’t get to see it.

We start with an ISubject. Instances of this interface can act as an observer and as an observable, and Rx provides four types that implement this interface – Subject, AsyncSubject, ReplaySubject and BehaviorSubject. The book covers what all of these subjects do, and how they can be used to proxy hot and cold observables to give you something with various interesting behaviours.

Chapters eight and nine go through the many operators, from Max and Count all the way through to operators for partitioning an incoming event stream into a set of windowed buffers.

Chapter ten talks about concurrency and synchronisation, and is the best explanation I have read of this side of the Rx world. There are many types of IScheduler that are implemented by the library, ranging from a scheduler that uses threads from the thread pool to schedulers that hijack the current thread and don’t return until a series of actions have finished.

The last chapter talks about error handling and recovery, and it also touches on the subject of backpressure. It is also very good and informative.

It is worth also mentioning that the book has three appendixes – some general coverage of asynchronous programming in .NET, a section on the Disposables that the Rx library offers and a section on testing Rx which talks about how you might Unit Test your code and use test schedulers to control the execution.

It was really good to have a single place that covered all of this material. Typically you can find some of this in blog posts spread all over the internet, but having it in a consistent story that develops over eleven chapters is brilliant.

I also noticed that the pre-release System.Reactive Nuget package contains code around the IQbservable interface. It will be interesting to see where that goes in the future.

Advertisements
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s