我们知道 ReentrantLock 分为公平锁和非公平锁,源码上是怎么实现公平和非公平?
我们在使用 ReentrantLock 一般的使用方式:
package threads.demo01; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockDemo { private volatile static int amount = 0; private static Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 2; i++) { Thread thread = new Thread(LockDemo::inc); thread.setName("线程" + i); thread.start(); } TimeUnit.SECONDS.sleep(3); System.out.println(amount); } public static void inc() { // 多线程环境调试时, 鼠标右键断点,在 suspend 选项选择 Thread lock.lock(); String threadName = Thread.currentThread().getName(); System.out.println(threadName + " 获取锁成功"); for (int i = 0; i < 100; i++) { amount++; } System.out.println(threadName + " 准备释放锁"); lock.unlock(); } }
为了方便阅读代码将ReentrantLock中核心代码拿出来:
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); } public void unlock() { sync.release(1); } // .... 省略其他代码 }
通过以上的源码我们可以分析出:
ReentrantLock 中锁的实现都在成员变量 Sync sync 中;
公平锁和非公平锁时 Sync 的不同实现,从代码类文件名称我们可以看出来 FairSync 是公平锁的实现而 NonfairSync 是非公平锁的实现。
下来看 Sync 源码:
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ // 使用 AQS 中的 state 来表示进行同步的控制 // 默认情况下 state 为 0 表示当前没有线程竞争锁资源 // state >=1 表示存在线程正持有资源, 对应的数量表示加锁次数 // ReentrantLock 是可重入锁,意思是获得锁资源的线程可以对该锁多次加锁,常见的场景比如递归方法中存在 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 没有线程竞争,尝试获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程是否已经获取到锁资源 int nextc = c + acquires; // 重复加锁 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { // 释放锁对应的 state 值进行 -releases 操作 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // free 用来表示当前线程是否完全释放了锁 boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { // 尝试获取锁资源(不管有没有排队等待的线程) if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // 获取锁资源,获取不到资源会将当前线程包装成 Node 节点插入到队列尾部,同时阻塞当前线程 } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); // 获取锁资源,获取不到资源会将当前线程包装成 Node 节点插入到队列尾部,同时阻塞当前线程 } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 等待队列中没有线程竞争才尝试获取锁资源 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { // tryAcquire(arg) 尝试获取锁资源 // addWaiter(Node.EXCLUSIVE) 将当前线程包装成 Node 节点插入到等待队列中 // acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 自旋方式循环获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取 node 节点的前驱节点 final Node p = node.predecessor(); // 当前节点如果在队列头部则有机会重新尝试获取锁 // 为什么不是 node == head 而是 p == head ? // 因为队列的第一个节点是虚节点(不是排队的线程节点) if (p == head && tryAcquire(arg)) { // node 节点获取到锁,跳到下一个节点(不再等待拉) setHead(node); p.next = null; // help GC failed = false; return interrupted; } // shouldParkAfterFailedAcquire(p, node) 获取锁失败是否应该被阻塞 // parkAndCheckInterrupt() 阻塞线程并检查中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ....
从上面的代码中我们可以很清晰的看到公平锁和非公平锁加锁时的差异:
公平锁: 实行先来先到元素有序获取锁。
非公平锁: 当前没有线程持有锁(或锁刚被释放时),想获取锁的线程不管有没有排队等待的线程都去直接竞争尝试获取锁资源。
下图是 ReentrantLock 非公平锁加锁流程:
释放锁
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { // 释放锁, Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 等待队列不为空则唤醒等待队列中下一个线程 return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }