Go…go…go… but only at the right time

Rich Hickey has gone and done it again! He’s borrowed some ideas from other languages, reorganised them and then added them as language constructs into Clojure. In this case the new experimental core.async library for Clojure.

Clojure has a number of interesting constructs for dealing with concurrency. These include Agents (based on Actors) and software transactional memory, both of which can interact with the non-functional part of the language which uses state containers called Atoms and Refs. This new library uses channels, an idea which dates back at least as far as Hoare’s CSP, and inversion of control ideas, which can found in C#’s async method code, in order to allow the user to write linear code which doesn’t lock a thread unless it is making forward progress.

At the simple level, user code allocates a channel, and then issues blocking reads and writes on this channel using the >!! and <!! functions. In the following code we create a channel, push a value to it using future to offload the write to another thread, and then read the value in the third expression. This returns the value 11.

(def c (chan))
(future (>!! c 11))
(<!! c)

Channels come in a number of varieties. By default they can only have a single element in transit between writer and reader, but the library offers buffered and sliding window channels too.

The library goes further though. In the above example we were blocking the current thread between the point that we asked for a value and the point at which the background thread wrote the value to the channel. In order to use our threads more efficiently, we’d like to return a thread to the thread pool until a value is available and there is more work to be done.

Consider the following example, which shows a session running in LightTable. LightTable allows you to run an interactive REPL that displays the values of expressions that appear in the session, but, best of all, it takes care of setting up a Clojure environment with the required project already loaded using leiningen. In order to play with core.async I simply had to clone the Git repository and then load the examples file into LightTable. Running a REPL on the buffer containing the file caused LightTable to load the relevant project and all of its dependencies. The purple highlights are added by LightTable s that you see values as functions within your application are called.


The example has a Go block. These can used to set up a context where a thread is only borrowed from the thread pool while the contained code moves from a blocking state (waiting to read from the  channel) to the next blocking state (<!, >! or alts! where the latter waits on a number of channels and returns the first value from a channel that offers a value). Clojure has macros, which make it very easy to write transformations on the abstract syntax tree and Go blocks are implemented by such a macro in macros.clj. The actual transformation is implemented by converting the contained code into SSA form and them generating a state machine between the various blocking points.

The Go block actually returns a channel which will contain the result of the block. In the code above, which I simplified from other code, we do not use the channel that is returned.

Using macroexpand we can see the result of the transformation on the Go block in the example, which I have included below. I have tried to comment the code to show the purpose of the various sections, and have removed some of the namespace information and reordered the case expression.

(let* [c__6067__auto__ (chan 1)
      captured-bindings__6068__auto__ (getThreadBindingFrame)]
      ;; We execute the following state machine function
        (let [f__6069__auto__
       ;; Initialize the state machine.
The state is stored as an array
       ;; of six elements.
       ;; The state machine function goes in slot 0.
       ;; The state in slot 1. The value from the last read in slot 2. Thread bindings in slots 3.
       ;; (see ioc_helpers.clj for other slot offsets)
             (java.util.concurrent.atomic.AtomicReferenceArray. 6)
         0 state-machine__5938__auto__ 1 1))
        [old-frame__5939__auto__ (getThreadBindingFrame)]
         ;; Reset the thread bindings
         (resetThreadBindingFrame (ioc-macros/aget-object state_6908 3))
            ;; Get the current state, and carry out a state transition.
            (int (ioc-macros/aget-object state_6908 1))
              ;; Initial state which waits for input on channel bound to input1,
              ;; and will use state 3 on restart
              1 (let
                [state_6908 state_6908]
                (ioc-macros/take! state_6908 3 input1))
              ;; Get the return value out of slot 2 in the state array – this is
              ;; used to pass in the next value. Store that in slot 5.
              ;; Read from the input2 channel, setting state 4 as the return state.
              3 (let
                [inst_6901 (ioc-macros/aget-object state_6908 2)
                state_6908 (ioc-macros/aset-all! state_6908 5 inst_6901)]
                (ioc-macros/take! state_6908 4 input2))
              ;; Add the value we read earlier (stored in slot 5) and the
              ;; value we read last (communicated in slot 2).
              ;; Write the value out, setting state 2 as the state after
              ;; the write finishes.
              4 (let
                [inst_6901 (ioc-macros/aget-object state_6908 5)
                inst_6903 (ioc-macros/aget-object state_6908 2)
                inst_6904 (+ inst_6901 inst_6903)
                state_6908 state_6908]
                (ioc-macros/put! state_6908 2 result inst_6904))
              ;; The write has happened. Finish the state machine.
              2 (let [inst_6906 (ioc-macros/aget-object state_6908 2)
                 state_6908 state_6908]
                 (ioc-macros/return-chan state_6908 inst_6906)))]
            (if (identical? result__5940__auto__ :recur)
         (finally (resetThreadBindingFrame old-frame__5939__auto__))))))
      ;; Set up the initial state and then move on one state
      (-> (f__6069__auto__)                           
           ioc-macros/USER-START-IDX c__6067__auto__
           ioc-macros/BINDINGS-IDX captured-bindings__6068__auto__))]
           (ioc-macros/run-state-machine state__6070__auto__))))


Unlike the C# async keyword, which requires a new C# compiler, macros mean that this can all be implemented in the new core.async library, which makes it very easy to experiment with different implementations. The library isn’t yet part of the core language, though it may be at some point in the future.

Rich Hickey talks about core.async library in an episode of the ThinkRelevance podcast. And this post has some more discussion of how these constructs can be used.

One feature that seems to be missing is a means to cancel running tasks along the lines of the Task Parallel Library’s cancellation tokens. There are a couple of good posts on cancellation here and here. These emphasise the need for cooperative cancellation and I don’t yet know of a way of doing these kind of things in Clojure.

This entry was posted in Computers and Internet. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s