Call my Agent will you?

Clojure contains some interesting concurrency mechanisms aside from the normal java locks mechanism. My favourite has got to be the bounded version of software transactional memory which we’ll get to in later post. There’s a good retrospective, by Joe Duffy, on some early work on getting software transactional memory into the CLR, though the post details how they opted for an unbounded implemented (all objects are affected by the transactions) which contrasts with the Clojure version where there is a specific object type that interacts with the transactions (a bounded implementation).

In order to see some code, we’ll consider a data structure which is a vector of two integers. Pretend they are bank balances if you will, and we need to maintain an invariant that the total of the two integers doesn’t change when we do a transfer operation of one unit from the first element to the second. The Vector type in Clojure is functional, so we can’t just change the first element of the vector when we start the transfer, but instead need to generate a new vector. This gives us the problem of where we put the current vector representing the current state. Clojure offers us a type called an Atom for handling this.

An Atom guards a value.
user=> (def balances (atom [1000, 0]))
#’user/balances
user=> balances
#<Atom@1286b10: [1000 0]>
user=> @balances
[1000 0]

More importantly, it supports a swap! operation. This takes the current value, applies a function to it to get a potential new value, and then tries to atomically assign this new value into the Atom. If the value has changed between the read and the assignment, then the system will try again, going around the read/generate/assign cycle until the new value is successfully set. Because of this behaviour, you’d usually want the function to be pure (with no side-effects).

Let’s define a couple of operations, action1 and action2 that we’ll run concurrently. The functions that generate new values, update1 and update2, output a message when they are being called. I’ve added some sleep statements to simulate long running actions and to try to ensure that we need to re-execute one of the updates.
(defn update1[balances]
  (println "update1")
  (let [[x y] balances]
    (. Thread (sleep 1000))
    [ (- x 1) (+ y 1)]))

(defn update2[balances]
  (println "update2")
  (let [[x y] balances]
    [ (- x 1) (+ y 1)]))

(defn action1[]
  (swap! balances update1))

(defn action2[]
  (. Thread (sleep 30))
  (swap! balances update2))

We can use pcalls to run these two actions simultaneously.
user=> (pcalls action1 action2)
update1
update2
update1
([998 2] [999 1])

We see that the first update function starts before the second update function, but this second function is quicker and assigns its result to the Atom, forcing the first update to run on the new value. Note how the invariant is maintained on the vector.

Internally the Atom is implemented as an AtomicReference
final AtomicReference state;

public Atom(Object state){
    this.state = new AtomicReference(state);
}

The deref method is used to access the value of the Atom,
user=> ‘@balances
(clojure.core/deref balances)

public Object deref(){
    return state.get();
}

And there are several overloads of the swap method for using the update function.
public Object swap(IFn f) throws Exception{
    for(; 😉
        {
        Object v = deref();
        Object newv = f.invoke(v);
        validate(newv);
        if(state.compareAndSet(v, newv))
            {
            notifyWatches(v, newv);
            return newv;
            }
        }
}

Given that we have easy access to java methods and classes, we could opt to use the normal java monitors to avoid the possibility of the swap! re-running the update function. We could create an instance of Object and use that as a lock to guard our data structure.

(def lock (Object.))

(defn lockedAction1[]
  (locking lock
    (action1)))

(defn lockedAction2[]
  (locking lock
    (action2))

The locking macro expands into code that uses monitors to manipulate the lock on the object.
user=> (macroexpand ‘(locking lock something))
(let* [lockee__2725__auto__ lock]
  (try (monitor-enter lockee__2725__auto__)
        something
  (finally (monitor-exit lockee__2725__auto__))))

When we run these versions of our functions, access to the Atom is serialized.

user=> (pcalls lockedAction1 lockedAction2)
update1
update2
([999 1] [998 2])

However, Clojure provides another means of guarding the data, which guarantees that only a single update function is running at any moment. This is called an Agent. When you send a message, which is an updater functions plus some arguments, to an Agent, the Agent makes sure that the updaters are serialized so you don’t need to do any locking.

We first create an Agent responsible for our two balances.
user=> (def balances (agent [1000, 0]))
#’user/balances

We can use the deref function, most easily denoted using the @ reader macro, to get the data from the Agent.
user=> @balances
[1000 0]

And now we can send-off the update1 and update2 actions to the Agent. The Agent will queue them and run them one at a time on some other thread.
user=> (pcalls #(send-off balances update1) #(send-off balances update2))
(#<Agent@15291cd: update1
[1000 0]> #<Agent@15291cd: [1000 0]>)
user=> update2

Note that in the above there are messages coming from the background threads that get mixed with the messages from the REPL (read-eval-print-loop). The balances have now been updated appropriately.

balances
#<Agent@15291cd: [998 2]>

The Agent class is declared in Agent.java and its constructor is called when you use the agent function defined in core.clj.
(defn agent
  ([state] (new clojure.lang.Agent state))
  ([state & options]
     (setup-reference (agent state) options)))

An Agent has the following fields.
volatile Object state;
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
volatile ISeq errors = null;
final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();

state is the state that the Agent is guarding. It is set by the constructor, which also allows the use of a validator function to check the invariants on this state.
public Agent(Object state, IPersistentMap meta) throws Exception {
    super(meta);
    setState(state);
}

boolean setState(Object newState) throws Exception{
    validate(newState);
    boolean ret = state != newState;
    state = newState;
    return ret;
}

errors is a collection of the errors that happened during the execution of the update functions. This collection is used to prevent access to the data if there are uncleared errors.
public Object deref() throws Exception{
    if(errors != null)
        {
        throw new Exception("Agent has errors", (Exception) RT.first(errors));
        }
    return state;
}

public ISeq getErrors(){
    return errors;
}

public void clearErrors(){
    errors = null;
}

The latter two functions have a Clojure interface defined in core.clj.
(defn agent-errors
  "Returns a sequence of the exceptions thrown during asynchronous
  actions of the agent."
  [#^clojure.lang.Agent a] (. a (getErrors)))

(defn clear-agent-errors
  "Clears any exceptions thrown during asynchronous actions of the
  agent, allowing subsequent actions to occur."
  [#^clojure.lang.Agent a] (. a (clearErrors)))

As we saw above, update functions are sent to Agents. This can be done in two ways, using either send or send-off. The interface functions are defined in core.clj.
(defn send
  [#^clojure.lang.Agent a f & args]
    (. a (dispatch f args false)))

(defn send-off
  [#^clojure.lang.Agent a f & args]
    (. a (dispatch f args true)))

These functions add instances of the nested Action class to the q. The Action records the Agent to which a given update function and args were sent, together with a value named solo that reflects which thread pool to run the action on.
public Object dispatch(IFn fn, ISeq args, boolean solo) {
    if(errors != null)
        {
        throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
        }
    Action action = new Action(this, fn, args, solo);
    dispatchAction(action);
    return this;
}

dispatchAction needs to behave differently if we are running inside a transaction (the first condition) or if a send happens inside the execution of an update function (the second case). In the normal case, the Action enqueues itself on the Agent to which it was sent.
static void dispatchAction(Action action){
    LockingTransaction trans = LockingTransaction.getRunning();
    if(trans != null)
        trans.enqueue(action);
    else if(nested.get() != null)
        {
        nested.set(nested.get().cons(action));
        }
    else
        action.agent.enqueue(action);
}

Enqueue is fairly straightforward.
void enqueue(Action action){
    boolean queued = false;
    IPersistentStack prior = null;
    while(!queued)
        {
        prior = q.get();
        queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action));
        }
    if(prior.count() == 0)
        action.execute();
}

Execute on an Action determines which thread pool to use for running the update function.
void execute(){
    if(solo)
        soloExecutor.execute(this);
    else
        pooledExecutor.execute(this);
}

In the solo case, when send-off is used in Clojure, we use an expandable threadpool, whereas in the send case we use a fixed size pool.
  final public static ExecutorService pooledExecutor =
          Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
  final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();

A worker thread ends up being passed an Action and executes the doRun method. The action code binds a variable *agent* to the currently executing agent and then sets the nested field ready to receive any nested calls to send. The update function is then called, and if there are no errors the new state is assigned. If there were errors during the execute of the update function, the error is logged and we don’t releasePendingSends that have accumulated in the nested field. In the success alternative, the nested collection is traversed and the nested sends are queued to the relevant agents. We then pop the action off the agent’s queue, and if the agent has more work to do we execute the next Action.
     static void doRun(Action action){
        try
            {
            Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
            nested.set(PersistentVector.EMPTY);

            boolean hadError = false;
            try
                {
                Object oldval = action.agent.state;
                Object newval =  action.fn.applyTo(RT.cons(action.agent.state, action.args));
                action.agent.setState(newval);
                action.agent.notifyWatches(oldval,newval);
                }
            catch(Throwable e)
                {
                action.agent.errors = RT.cons(e, action.agent.errors);
                hadError = true;
                }

            if(!hadError)
                {
                releasePendingSends();
                }

            boolean popped = false;
            IPersistentStack next = null;
            while(!popped)
                {
                IPersistentStack prior = action.agent.q.get();
                next = prior.pop();
                popped = action.agent.q.compareAndSet(prior, next);
                }
            if(next.count() > 0)
                ((Action) next.peek()).execute();
            }
        finally
            {
            nested.set(null);
            Var.popThreadBindings();
            }
    }

Note how the use of the .count() to trigger the execution of Actions means that only a single Action will be running on a given Agent at once.

Another important thing we can do with an Agent is wait for it to finish processing its action queue. This is done with the await function together with a version, await-for, which times out and await1 which is optimised for a single Agent. This queues a send to each agent and then tracks their completion. The function uses *agent*, which is bound by the execution code when we are inside agent processing on the current thread, to stop await being called inside an update action. This code also uses the io! macro which throws an exception if we are inside a transaction.
(defn await
  "Blocks the current thread (indefinitely!) until all actions
  dispatched thus far, from this thread or agent, to the agent(s) have
  occurred."
  [& agents]
  (io! "await in transaction"
    (when *agent*
      (throw (new Exception "Can’t await in agent action")))
    (let [latch (new java.util.concurrent.CountDownLatch (count agents))
          count-down (fn [agent] (. latch (countDown)) agent)]
      (doseq [agent agents]
        (send agent count-down))
      (. latch (await)))))

Advertisements
This entry was posted in Computers and Internet. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s