I thought I’d just done that

Clojure is great in that its really easy to access the underlying java platform. Standard functions are compiled into classes supporting Runnable
  user=> (instance? Runnable (fn [] 10))
  true

and hence one can easily start a new Thread using such a function as method to run on the new thread.
user=> (.start (new Thread (fn [] (pr "Running"))))
nil
user=> 1
"Running"1

This post will attempt to dig into the implementation of the Software Transaction Memory (hereafter referred to as STM) in Clojure. Clojure provides a storage location called a Ref that will interact with any running transaction, allowing the same Ref to be seen as containing different values when running from different threads, and with the system merging a set of consistent changes when a transaction commits. Indeed, the system will even retry the transaction if a value is read but is changed by some other thread before the current transaction successfully commits.

We’ll be working using the usual bank account example. We’ll have two bank accounts, with a transaction transferring money between them. We’ll have two separate threads trying to do this action, and will use a sleep to ensure that one of the transactions needs to restart.

user=> (def account1 (ref 1000))
#’user/account1
user=> account1
#<Ref@157b39f: 1000>
user=> @account1
1000
user=> (def account2 (ref 0))
#’user/account2
user=> account2
#<Ref@169df00: 0>
user=> @account2
0

We’ll use the following function to transfer an amount of money. I’ve added some delays so that we can later try to get some contention between the transactions.
(defn transfer [amount delay1 delay2]
  (sync
   nil
   (. Thread (sleep delay1))
   (let [acc1 @account1 acc2 @account2]
     (ref-set account1 (- acc1 amount))
     (. Thread (sleep delay2))
     (ref-set account2 (+ acc2 amount)))))

Now we can try running a couple of transactions in parallel, with some delays inserted to make things interesting.
(do (.start (new Thread (fn [] (transfer 1 0 2000))))
    (.start (new Thread (fn [] (transfer 1 1000 0)))))

We get the following breakpoints hit in the debugger. The breakpoints are set at the location where the closure representing the body of the transaction is called and the line after the value is returned by this closure.
Breakpoint hit: "thread=Thread-41", clojure.lang.LockingTransaction.run(), line=
236 bci=77
236                     ret = fn.call();
Thread-41[1] cont
>
Breakpoint hit: "thread=Thread-42", clojure.lang.LockingTransaction.run(), line=
236 bci=77
236                     ret = fn.call();
Thread-42[1] cont
>
Breakpoint hit: "thread=Thread-42", clojure.lang.LockingTransaction.run(), line=
236 bci=77
236                     ret = fn.call();
Thread-42[1] cont
>
Breakpoint hit: "thread=Thread-41", clojure.lang.LockingTransaction.run(), line=
238 bci=84
238                     if(info.status.compareAndSet(RUNNING, COMMITTING))
Thread-41[1] cont
>
Breakpoint hit: "thread=Thread-42", clojure.lang.LockingTransaction.run(), line=
236 bci=77
236                     ret = fn.call();
Thread-42[1] cont
>
Breakpoint hit: "thread=Thread-42", clojure.lang.LockingTransaction.run(), line=
238 bci=84
238                     if(info.status.compareAndSet(RUNNING, COMMITTING))

The lines on which the breakpoints are set are inside
main[1] where
  [1] clojure.lang.LockingTransaction.run (LockingTransaction.java:236)

232                             startPoint = readPoint;
233                             startTime = System.nanoTime();
234                             }
235                     info = new Info(RUNNING, startPoint);
236 =>                  ret = fn.call();
237                     //make sure no one has killed us before this point, and can’t from now on
238                     if(info.status.compareAndSet(RUNNING, COMMITTING))
239                             {
240                             for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
241                                     {

From the breakpoints, we see that one of the threads executed fn.call() once and the other executed it three times. We’ll investigate why this has happened. We expect this to be because one of the transactions tried to commit but found that another transaction had committed first. Therefore, the system aborted it and retried. In our example, one thread called the body closure three times.

So what happens when one of the transactions, the code inside the sync block, is executed. Well, the sync macro looks like:
(defmacro sync
  [flags-ignored-for-now & body]
  `(. clojure.lang.LockingTransaction
      (runInTransaction (fn [] ~@body))))
The flags-ignored-for-now is expected to be nil – the macro really ought to check for that… I often forget to supply it and then wonder why the first form isn’t executed.

Let’s first step through the case where only a single transaction is running at a time.
user=>  (.start (new Thread (fn [] (transfer 1 0 2000))))
nil

This goes into the runInTransaction code in LockingTransaction.
static public Object runInTransaction(Callable fn) throws Exception{
    LockingTransaction t = transaction.get();
    if(t == null)
        transaction.set(t = new LockingTransaction());
    if(t.info != null)
        return fn.call();
   return t.run(fn);
}

This looks for any existing transaction on the current thread, by accessing a ThreadLocal containing a LockingTransaction.
  final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();
If there is a an existing transaction on the thread, we simply get on with calling the closure representing the body of the sync block; we’ll look at the meaning of the info field later. Otherwise we go into the run method.

The run method is going to loop around trying to execute the transaction up to a certain retry limit. The first thing we do after getting some local variables ready is to take an integer value representing the current point in time that the transaction starts.
229                     getReadPoint();

final private static AtomicLong lastPoint = new AtomicLong();

void getReadPoint(){
    readPoint = lastPoint.incrementAndGet();
}

We’ll now go on to make an Info object. This has the following fields; one describing the current status of the transaction and the other recording the start point in time.
235                     info = new Info(RUNNING, startPoint);

final AtomicInteger status;
final long startPoint;

We then call the closure representing the body of the sync block.
236                     ret = fn.call();

The code is soon going to jump into code to access the value of account1. The binding of this Var is fetched and then we jump into the deref method of the Ref object.
96    public Object deref(){
97 =>   LockingTransaction t = LockingTransaction.getRunning();
98      if(t == null)
99              return currentVal();
100     return t.doGet(this);
101    }

And the final return of this accesses the doGet method of the LockingTransaction. This first checks that the transaction hasn’t been aborted, using the info object associated with the transaction.
351     if(!info.running())
If the transaction isn’t running, then we throw an instance of RetryEx which signals to the runInTransaction that the transaction needs to be restarted. A single instance of this class is made and stored as a static field in the LockingTransaction class.

Next we see if we have set the value from this transaction. If we have done this previously as part of this transaction, the value will be added to a HashMap called vals,
  final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
If we have then this is the value that the transaction must see.
353     if(vals.containsKey(ref))
354             return vals.get(ref);

Each Ref has a lock that we now take to allow us to do some modifications
  final ReentrantReadWriteLock lock;
We take this lock,
357 =>          ref.lock.readLock().lock();

A Ref contains a field
  TVal tvals;
These TVal instances form a chain of the value of the Ref across the various transactions.
public static class TVal{
    Object val;
    long point;
    long msecs;
    TVal prior;
    TVal next;

    TVal(Object val, long point, long msecs, TVal prior){
        this.val = val;
        this.point = point;
        this.msecs = msecs;
        this.prior = prior;
        this.next = prior.next;
        this.prior.next = this;
        this.next.prior = this;
    }

    TVal(Object val, long point, long msecs){
        this.val = val;
        this.point = point;
        this.msecs = msecs;
        this.next = this;
        this.prior = this;
    }
}

We fetch the head of this chain,
360 =>          Ref.TVal ver = ref.tvals;
and then loop to find the value of the Ref as it was at the point when the transaction started.
361             do
362                     {
363                     if(ver.point <= readPoint)
364                             return ver.val;
365                     } while((ver = ver.prior) != ref.tvals);
On the way out, we unlock the Ref,
369             ref.lock.readLock().unlock();
And if we didn’t find the readpoint in the list, we fail the transaction.
372     ref.faults.incrementAndGet();
373     throw retryex;

We have now successfully read the value of the Ref, as seen from the current transaction.

Soon we get into the code for setting the Ref. The code for setting jumps through into the transaction
144     return LockingTransaction.getEx().doSet(this, val);

We check that a transaction is running, and that we haven’t used a commute operation on the Ref before. A commute is a means of setting a value is a way that interacts better with other running transactions.
378 =>  if(!info.running())
379             throw retryex;
380     if(commutes.containsKey(ref))
381             throw new IllegalStateException("Can’t set after commute");

The transaction maintains a HashSet of all the Refs that have been set.
final HashSet<Ref> sets = new HashSet<Ref>();

We add the Ref to the sets collection to say that this transaction has set the Ref, and then we call the lock method. After the lock method returns, we’ll add the latest value we have set into the vals collection. This was accessed by the doGet code and ensures that the transaction always sees the latest value that it set when it reads from a Ref.
if(!sets.contains(ref))
    {
    sets.add(ref);
    lock(ref);
    }
vals.put(ref, val);

The lock method inside LockingTransaction is quite long so let’s step through that. First we take the writelock on the Ref.
113             ref.lock.writeLock().lock();

If another transaction has committed to the Ref, then a new TVal will have been added to the Ref. We can detect this and abort the current transaction.
114             if(ref.tvals != null && ref.tvals.point > readPoint)
115                     throw retryex;

The tinfo field of a Ref is used to record the writing transaction that “owns” it. This field may be set later in this lock method. For now, we check that there isn’t a different owner. If there is, we potentially try to barge the other transaction, and if that fails, then we clear state using the stop method and throw the retry exception.
if(refinfo != null && refinfo != info && refinfo.running())
    {
    if(!barge(refinfo))
        {
        ref.lock.writeLock().unlock();
        unlocked = true;
        //stop prior to blocking
        stop(RETRY);
        synchronized(refinfo)
            {
            if(refinfo.running())
                {
                try
                    {
                    refinfo.wait(LOCK_WAIT_MSECS);
                    }
                catch(InterruptedException e)
                    {
                    }
                }
            }
        throw retryex;
        }
    }

stop basically cleans up
void stop(int status){
    if(info != null)
        {
        synchronized(info)
            {
            info.status.set(status);
            info.notifyAll();
            }
        info = null;
        vals.clear();
        sets.clear();
        commutes.clear();
        //actions.clear();
        }
}

The barge method checks to see if we have been running for a certain amount of time and if we started before the other transaction. If these conditions are met, we might well have done lots of useful work, so we should get the other transaction to retry and we should be allowed to continue.
private boolean barge(Info refinfo){
    boolean barged = false;
    //if this transaction is older
    //  try to abort the other
    if(bargeTimeElapsed() && startPoint < refinfo.startPoint)
        {
        synchronized(refinfo)
            {
            barged = refinfo.status.compareAndSet(RUNNING, KILLED);
            if(barged)
                refinfo.notifyAll();
            }
        }
    return barged;
}

If the lock method didn’t need to barge, or if it successfully barged another transaction away, we set ourselves as the owner
143             ref.tinfo = info;

That concludes the code for setting the Ref. We now need to go back to the code in run that tries to commit the work when the transaction finishes.
Thread-44[1] stop in clojure.lang.LockingTransaction:236
Set breakpoint clojure.lang.LockingTransaction:236
Thread-44[1] stop in clojure.lang.LockingTransaction:238
Set breakpoint clojure.lang.LockingTransaction:238

We get back to the code just after the call into the closure that executes the body of the sync.
236                     ret = fn.call();
237                     //make sure no one has killed us before this point, and
can’t from now on
238 =>                  if(info.status.compareAndSet(RUNNING, COMMITTING))
239                             {

The code first checks for the validity of the commutes set. We’ll cover the notion of “commutes” in a later entry.
  for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())

Next we take locks on all of the referenced Refs
for(Ref ref : sets)
    {
    if(!commutes.containsKey(ref))
        {
        ref.lock.writeLock().lock();
        locked.add(ref);
        }
    }

And check all of the validation functions
for(Map.Entry<Ref, Object> e : vals.entrySet())
    {
    Ref ref = e.getKey();
    ref.validate(ref.getValidator(), e.getValue());
    }

After this we are ready to set the Refs to the newly committed values.
long msecs = System.currentTimeMillis();
long commitPoint = getCommitPoint();
for(Map.Entry<Ref, Object> e : vals.entrySet())
    {
    Ref ref = e.getKey();
    Object oldval = ref.tvals == null ? null : ref.tvals.val;
    Object newval = e.getValue();
    if(ref.tvals == null)
        {
        ref.tvals = new Ref.TVal(newval, commitPoint, msecs);
        }
    else if(ref.faults.get() > 0)
        {
        ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);
        ref.faults.set(0);
        }
    else
        {
        ref.tvals = ref.tvals.next;
        ref.tvals.val = newval;
        ref.tvals.point = commitPoint;
        ref.tvals.msecs = msecs;
        }
    if(ref.getWatches().count() > 0)
        notify.add(new Notify(ref, oldval, newval));
    }
done = true;
info.status.set(COMMITTED);
}

Note that there is one other interesting point. A transaction will only be retried a certain number of times.
public static final int RETRY_LIMIT = 10000;

If we fail to commit after running it that many times, an exception is thrown.
throw new Exception("Transaction failed after reaching retry limit");
This kind of bound is really hard to test for, as there are many scenarios when it won’t be hit during testing, but it might occur very occasionally out in the field.

In the finally clause in the run method, there is also code to unlock the Ref locks we took out earlier, and code to try to carry out any agent send messages that were sent during the transaction – these get caught and batched until the transaction is successful.
if(done) //re-dispatch out of transaction
    {
    for(Notify n : notify)
        {
        n.ref.notifyWatches(n.oldval, n.newval);
        }
    for(Agent.Action action : actions)
        {
        Agent.dispatchAction(action);
        }
    }
Watches are functions that can be called when something changes. There were various calls to handle these in the code above.

We can now add some breakpoints to the parts of the code that throw the RetryEx, and run the original example.
> stop in clojure.lang.LockingTransaction:115
Set breakpoint clojure.lang.LockingTransaction:115
> stop in clojure.lang.LockingTransaction:140
Set breakpoint clojure.lang.LockingTransaction:140

user=> (do (.start (new Thread (fn [] (transfer 1 0 2000))))
    (.start (new Thread (fn [] (transfer 1 1000 0)))))

Then we see the transaction being restarted as we expected.
Breakpoint hit: "thread=Thread-56", clojure.lang.LockingTransaction.lock(), line
=140 bci=126
140                             throw retryex;

There are several points we need to make. First, the body of the sync really needs to have no side effects as it may be executed a large number of times before the transaction successfully commits. Clojure provides the handle io! macro which can be used to guard the body and which throws an exception if execution is attempted while a transaction is running.
(defmacro io!
  [& body]
  (let [message (when (string? (first body)) (first body))
        body (if message (next body) body)]
    `(if (clojure.lang.LockingTransaction/isRunning)
       (throw (new IllegalStateException ~(or message "I/O in transaction")))
       (do ~@body))))

There are some other transaction related functions. Ensure, alter and commute can be used to interact with Refs during a transaction. We’ll look at them in more detail later.

Advertisements
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s