Calm – don’t react!

I’ve found the idea of lazy event streams fascinating ever since I came across the idea in the mid-80s when I started playing in Lazy Lispkit Lisp. There was some work at the Programming Research Group in Oxford to implement an operating system using a functional language, with incoming events represented as lazy lists (sequences). The Reactive Extensions, which Microsoft has been releasing as a DevLabs project, share a lot of the ideas behind this concept.

This whole technology is based around the interfaces IObservable<T> and IObserver<T>, the duals to the IEnumerable<T> and IEnumerator<T> interfaces. Observers subscribe to observables which push messages to them using the OnNext method, and inform them of completion using OnComplete and of errors using OnError. The power comes from the pre-supplied set of observables, the utility methods for converting things such as WinForms events into Observables, and the rich set of combinators for combining the Observables. The latter are designed to be usable easy via LINQ.

We could for example write some code that prints times in tenths of a second.

var ticktock = Observable.Interval(TimeSpan.FromSeconds(1));

var clock =
    from tick in ticktock
    from tenths in Observable.Interval(TimeSpan.FromSeconds(0.1)).TakeUntil(ticktock)
    select tick + "." + tenths;

clock.Subscribe(x => Console.WriteLine(x));

Notice the way that we subscribe to the same observable twice in order to show the use of the TakeUntil method. This method acts a bit like a switch allowing us to consume the output from the Observable which is ticking every tenth of a second until the Observable which fires every second fires again. That piece of LINQ syntax translates into something of the form

var clock =
    ticktock.SelectMany(
      tick => Observable.Interval(TimeSpan.FromSeconds(0.1)).TakeUntil(ticktock),
      (tick, tenths) => tick + "." + tenths);

When running under .NET 3.5 on my laptop, this gives the following output, which might not be quite the set of results you expect.

0.0
0.1
0.2
0.3
0.4
0.5
0.6
0.7
0.8
1.0
1.1
1.2
1.3
1.4
1.5
1.6
1.7
1.8
1.9
2.0
2.1
2.2
2.3

Using Observables is a little different from the lazy functional programming way of using streams of events. First, in most of the functional models, everything is single-threaded. In Rx, the user can take some control of the threading model that is used for responding to the events (using ObserveOn). In the above example, whereas in the functional model there would be a single list of events which the two references to ticktock share, in the Rx model the timer is a cold observable with the semantics that the two subscriptions are to two different timers. These two timers are independent and may not be synchronised – this si what leads to the “strange” output above.

We can study what is going on a little more, by defining our own Observable and using that instead of the timer.

class MyObservable : IObservable<long>
{
    List<IObserver<long>> m_Targets = new List<IObserver<long>>();
    object m_Lock = new object();

    public IDisposable Subscribe(IObserver<long> observer)
    {
        lock (m_Lock)
        {
            m_Targets.Add(observer);
            return new UnSubscribe(this, observer);
        }
    }

    class UnSubscribe : IDisposable
    {
        readonly MyObservable m_Target;
        readonly IObserver<long> m_Observer;
        public UnSubscribe(MyObservable target, IObserver<long> observer)
        {
            m_Target = target;
            m_Observer = observer;
        }

        public void Dispose()
        {
            lock (m_Target.m_Lock)
            {
                m_Target.m_Targets.Remove(m_Observer);
            }
        }

    }

    public void Fire(long value)
    {
        List<IObserver<long>> targets = new List<IObserver<long>>();
        lock(m_Lock)
        {
            targets.AddRange(m_Targets);
        }
        foreach (var target in targets)
        {
            target.OnNext(value);
        }
    }

}

We can now change the example to the following code.

var ticktock = new MyObservable();

var clock =
    from tick in ticktock
    from tenths in Observable.Interval(TimeSpan.FromSeconds(0.1)).TakeUntil(ticktock)
    select tick + "." + tenths;

clock.Subscribe(x => Console.WriteLine(x));

If we run this code, we find that at the end of it, there is a single instance of System.Collections.Generic.AnonymousObserver<long> subscribed to our ticktock observable. If we follow the OnNext method calls, we eventually (after several steps) arrive at the code representing the “x => Console.WriteLine(x)” lambda expression.

We might wonder where the methods are that handle the “from tenths …” code of the LINQ expression. The methods for dealing with this are setup on demand, so nothing will happen until the ticktock fires for the first time.

If we then execute

ticktock.Fire(1);

we see that there is now a second subscription to the ticktock observer. If we follow the chain of OnNext calls we find an Observable that takes input from two sources, the Interval Observable that generates every tenth of a second and the ticktock. This observable is rigged to stop the data flow once the ticktock fires again.

In memory, if we look at only the OnNext methods, we have a dataflow network that is dynamically changed depending on the firing of various events. It is this dynamic change of the network that requires knowledge of the threading model that is happening under the covers. This is particularly important in the above example where the two events from the timers will race, with one timer driving the dataflow network to lose the connection to the existing tenths timer and set up a connection to a new one, while at the same time the tenths timer is trying to push a value through to the select. The data race isn’t too hard to see in this example, but with a large network it might be tricky to determine if there are multiple subscriptions to a source, and the ordering of the push events from the multiply connected source might not be well defined.

In a recent down tools week at Red Gate, a week in which the developers get to work on a project of their choosing, I spent a little time working with the memory profiler technology we have to see if it could be used to analyse the state of the dataflow network associated with an Rx expression. We could imagine using this technology to view a dataflow network and perhaps answer questions about multiple subscriptions to the same source. Unfortunately, I didn’t have enough time to get particularly far with the prototype, but I see some kind of visualization support as a vital debugging tool if this declarative style for specifying concurrency becomes prevalent.

Advertisements
This entry was posted in Computers and Internet. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s