ReentrantLock加锁过程简单分析

2017-05-06 02:40
什么是可重入锁?

Jdk api中的介绍

一个可重入的互斥锁Lock,它具有与使用synchronized方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

ReentrantLock将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用lock的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。

怎么用?
class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

public void m() { 
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
 }
怎么实现的?

ReentrantLock的锁是通过它内部的一个同步器实现的。

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

同步器的类型有两个子类,公平同步器与非公平同步器,也即公平锁与非公平锁。这两个子类是锁的具体实现,我们对锁的操作也都是作用在这两个类的实例上的。

 ReentrantLock加锁过程简单分析0

我们在使用ReentrantLock时,通过无参构造器创建的lock是就是非公平的。

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

 如果想要使用公平锁,我们可以使用有参构造器来创建lock,传入true表示公平锁,传入false表示非公平锁。 

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

 创建完成后,我们使用锁的方式就是调用lock方法,那lock方法又是怎么实现的呢?我们来看一下代码。

/**
 * Acquires the lock.
 *
 * <p>Acquires the lock if it is not held by another thread and returns
 * immediately, setting the lock hold count to one.
 *
 * <p>If the current thread already holds the lock then the hold
 * count is incremented by one and the method returns immediately.
 *
 * <p>If the lock is held by another thread then the
 * current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the lock has been acquired,
 * at which time the lock hold count is set to one.
 */
public void lock() {
    sync.lock();
}

通过代码我们可以看到,ReentrantLock的lock方法是通过调用sync的lock方法来实现的。根据我们前面的介绍,sync又是FairSync和NonfairSync中的一个,那我们下面就来具体分析一下这两个类中lock方法的具体实现。

准备工作

在开始之前,我们先做点准备工作。从下面的类继承关系图中可以看到,ReentrantLock中的公平锁与非公平锁都是从AQS(AbstractQueuedSynchronizer)扩展来的,所以在详细介绍ReentrantLock之前,我们先简单的了解一下AQS。

 ReentrantLock加锁过程简单分析1

在API中,对AQS的解释是:

为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件、等等)提供一个框架。此类的设计目标是成为依靠单个原子int值来表示状态的大多数同步器的一个有用基础。

通过这两句话,我们可以看出,AQS中有两个重要的概念,一个是表示状态的单个int值(state),一个是FIFO的队列。在Java并发包中的各种同步器就是通过对这两个概念进行不同的操作而实现的。其中需要额外说明的就是,对表示状态的int值的操作,是通过CAS的方式完成的,这样才能保证在多线程的环境下的安全性。这两个概念具体的内容与相关操作,我们等待用到的时候再说。

在ReentrantLock中表示状态的int值表示的是锁的数量。

非公平锁(NonfairSync)

概览

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

lock方法

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

通过代码可以看出,获得锁的第一步是通过CAS的方式,尝试将state从0变为1。如果成功,就表示当前线程成功抢到了锁,接着就会将当前线程设为ExclusiveOwnerThread,即独占线程。如果失败,就会接着调用acquire方法。通过这抢锁的第一步,我们可以明显感受到,这就是一个赤裸裸的插队行为。当新线程抢夺锁的时候,它不会关注此时到底有多少个线程在排队等待,而是直接尝试抢锁。如果此时锁正好被释放了,那新线程就有了成功的可能。这就是不公平锁第一处体现不公平的地方。

acquire方法

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

当第一步插队失败后,会调用require方法,并传入1。接着会调用tryAcquire方法再次抢锁。在非公平锁中,tryAcquire方法会直接调用nonfairTryAcquire方法。

nonfairTryAcquire方法

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在此方法中,首先会查看当前的锁数量,如果数量为0,表示当前无锁,那么就会再使用一次插队抢锁。如果抢锁成功,就会直接返回true,这就是不公平锁第二处体现不公平的地方。如果抢锁失败,就会看当前持锁的线程是不是抢锁的线程,如果是的话,锁数nextc+1,并返回true。这也就是为什么叫可重入锁的原因,持锁线程可重复的获取锁。其他情况则会返回false,回到acquire方法中。

当前面的操作都不能成功抢到锁的时候,就会将线程加入到等待队列中。

addWaiter方法,线程进入等待队列

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

 我们前面提到过ASQ中的两个重要概念,之前已经用到一个state了,现在是FIFO队列登场的时候了。

在ASQ内部有一条双向链表来存放等待线程,链表中的每个节点都是一个Node对象。Node对象中维护了线程,等待状态以及前后Node的指针。线程在加入队列之前需要包装成Node对象,而包装线程的方法就是addWaiter。

Node有个表示共享模式的字段,在包装时需要提供,这个值是由参数传来的,默认是独占模式。当把线程包装成Node对象以后,就需要入队了。如果队列不为空,那么此方法就会尝试着快速入队,如果快速入队失败或者队列为空,那么就会使用正常的入队方法。快速入队是使用CAS的方式将新建的node放到队尾,因为在多线程的环境下,有可能在放入队尾的时候被其他线程捷足先登,所以后面会有个正常入队的操作。正常入队操作就是循环入队,直到成功为止。入队操作之后,需要做的就是阻塞等待线程。

acquireQueued方法,阻塞等待线程

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先通过node的predecessor方法获取到前一个节点。如果前一个节点是头结点(头结点是获取了锁的线程),说明队列中只有一个节点,那么就会直接开始自旋抢锁。如果前节点不是头结点或抢锁失败,那么就会调用shouldParkAfterFailedAcquire方法。

shouldParkAfterFailedAcquire方法

/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这个方法会检查当前节点前节点的等待状态,如果是SIGNAL,则表示前节点已经park了,即已经暂停了,那么当前线程也就可以安全暂定等待唤醒了。如果等待状态大于0,则表示前节点已经被取消了,那么会从当前节点往前遍历,去掉已经被取消的节点。其他情况则将前节点的等待状态改为SIGNAL。因为这个方法外部是一个死循环,所以最终会返回true 的。如果返回true,那就会执行parkAndCheckInterrupt方法。

parkAndCheckInterrupt方法

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
} 

这个方法比较简单,就是阻塞当前线程,以防外层死循环消耗系统资源并等待被前节点唤醒。

总结

至此,非公平锁获取锁的过程就结束了。简单总结一下抢锁的过程:

1. 通过CAS的方式将state(锁数量)从0设置为1(第一次插队)。

  1.1 如果设置成功,设置当前线程为独占锁的线程。

  1.2 如果没有成功,那么会再获取一次锁数量。

    1.2.1 如果锁数量为0,那么会再次用通过CAS的方式将state(锁数量)从0设置为1(第二次插队)。

    1.2.2 如果锁数量不为0,或者第二次抢锁失败,那么会查看独占锁线程是否是当前线程。

      1.2.2.1 如果是,则将当前锁数量+1。

      1.2.2.2 如果不是,则将当前线程包装为一个Node对象,并打入到阻等待队列中去,等待被前节点唤醒。