并发包


并发包

聊聊你对 AQS 的理解

Java 并发包的核心是 AQS 抽象类,它抽象了并发控制的核心逻辑。在 AQS 基础上,实现了公平同步器和非公平同步器。接着,就是各种 Lock 的实现,例如写锁、可重入锁、读写锁。接着在锁的基础上,还衍生了 Condition 条件锁。最后就是基于 Lock 的各种应用,例如:并发工具类、阻塞队列、线程池。上面这个实现逻辑,可以简化为下面的实现示意图:

AQS
 ↓
同步器:Sync(FairSync、NonfairSync)
 ↓
锁实现:Lock(ReadLock、WriteLock、ReentrantLock、ReentrantReadWriteLock、Condition) 
 ↓
工具类:CyclicBarrier、CountdownLatch、Semaphore

在 AQS 的实现中,整体的实现逻辑是这样的:

  1. 内部通过使用 CAS 和 Unsafe 的 Unpark 操作,对 state 变量的原子操作,从而实现同步功能。
  2. 此外,AQS 里还实现了一个队列,让所有需要获取锁的队列去排队获取。另外,为了实现 Condition 的多条件获取锁,还有另外一个队列去存放获取 Condition 锁的线程对象。

上述大致就是 AQS 的实现逻辑了。

聊聊 AQS 的内部实现细节

在这条 Java 并发包的实现中,最关键的是 Lock 的并发控制实现、Condition 的并发控制实现。下面我将用 ReentrantLock 为例子,讲一下 Lock 的并发控制实现。

Lock 并发控制的 lock() 实现。 当我们调用 ReentrantLock 的 lock 方法,调用 NonfairSync 的 lock() 方法。方法中,先用 cas 尝试获得锁,如果成功则直接返回。如果失败,将会调用 AQS 的 acquire() 方法。

进入 AQS 的 acquire() 方法,首先调用 tryAcquire() 方法尝试再次获取锁,此时调用的是 ReentrantLock 的 tryAcquire() 实现。如果成功,那么则直接返回。如果失败,那么接着调用 addWaiter() 方法将该元素加入 AQS 内部的等待队列。

接着调用 acquireQueued() 方法,进入一个自旋。在自旋中,会判断前置元素是否是头结点,那么再次尝试获取锁,获取成功则返回。如果获取失败了,那么判断前置节点的状态是否是 SIGNAL 状态,那么表示前置节点释放锁的时候会唤醒后置节点,那么直接休眠。否则修改前置节点的状态值等,继续进入下一轮循环。

简单概括一下 lock() 方法的逻辑:

  1. 调用 ReentrantLock 的 lock 方法,调用 NonfairSync 的 lock() 方法。方法中,先用 cas 尝试获得锁,失败后调用 AQS 的 acquire() 方法。
  2. 进入 AQS 的 acquire() 方法,首先调用 tryAcquire() 方法尝试再次获取锁,此时调用的是 ReentrantLock 的 tryAcquire() 实现。接着调用 addWaiter() 方法将该元素加入等待队列。接着调用 acquireQueued() 方法,进入一个自旋。如果前置元素是头结点,那么再次尝试获取锁,获取成功则返回。如果获取失败了,那么判断前置节点的状态是否设置正确,如果是 SIGNAL,那么直接休眠,否则修改前置节点的状态值等,继续进入下一轮循环。

其调用链路为:lock -> CAS -> tryAcquire -> addWaiter -> acquireQueued。

Lock 并发控制的 unlock() 实现。 而当我们调用 ReentrantLock 的 unlock() 方法,调用 NonfairSync 的 release() 方法。在这个方法中,会尝试调用 AQS 的 tryRelease 方法去释放锁资源,其实调用的是 ReentrantLock 的 tryRelease 方法去设置 state 状态。如果成功,那么就唤醒后继节点。

简单概括一下 unlock() 方法的逻辑:

  1. 调用 ReentrantLock 的 unlock() 方法,调用 NonfairSync 的 release() 方法,最终调用 AQS 的 release() 方法。
  2. AQS 的 relese() 方法。调用 tryRelease() 方法,调用回 ReentrantLock 的 tryRelease() 实现去修改 state 状态。这里会判断当前线程是否是拥有该锁的线程,如果是的话才能设置成功。如果设置 state 成功,那么调用 unparkSuccessor() 方法唤醒后继节点,返回 true,否则返回 false。

其调用链路为:release -> tryRelease -> unparkSuccessor。

Condition 并发控制的 lock() 实现。 Condition 通过 await() 方法和 signal() 方法实现并发控制。当我们调用 lock 对象的 newCondition 方法时,会返回一个新建的 ConditionObject 对象,这个对象是 Condition 的核心业务逻辑实现,并且还包含了一个等待队列。

等待。 当我们调用 condition 对象 await() 方法时,会调用 addConditionWaiter 方法将当前线程加入 condition 的等待队列中,并释放锁资源。接着会进入自旋,判断对象是否被唤醒加入到了 AQS 的队列中。如果未被加入到 AQS 队列中,那么就进入休眠状态,等待另一个线程的唤醒。

唤醒。 当我们调用 condition 对象的 signal() 方法时,会调用 doSignal 方法将该线程所代表的的节点,从 Condition 的队列中转移到 AQS 的队列中,并唤醒等待的线程。等待线程被唤醒后,会尝试去获取锁资源,继续执行。

聊聊你对并发工具类的理解

我们常用的并发工具类有:CyclicBarrier、CountdownLatch、Semaphore 这三个。简单地说:

CyclicBarrier 是一组线程,互相等待,等所有线程达到某个状态后,才可以继续执行下次,它们之间是平等的。 我们使用的时候,需要传入一个计数,之后再传入一个执行线程。每次调用 CyclicBarrier 的 await() 方法时,计数减一。当计数减为 0 时,执行传入的执行线程。 CyclicBarrier 的计数是自动恢复的,其底层基于 Lock/Condition 实现。

CountdownLatch 是某个线程,等待其他线程到了,这个线程才去做某个事情,有主次之分。 当我们使用 CountdownLatch 的时候,需要传入一个计数。需要等待的主线程调用 countDownLatch.await() 方法等待,其他线程调用 countDownLatch.countDown() 方法。当计数减为 0 时,主线程便会被唤醒,继续执行主线程后续的内容。 CountdownLatch 则是通过内部类 Sync 继承 AQS 实现的。

Semaphore 是那种类似资源获取控制,其管理着一批资源,借出资源减一,回收资源加一。 使用 Semaphore 时先传入一个计数,表示拥有的资源。其他线程需要获取资源时调用 semaphore.acquire() 方法,释放资源时调用 semaphore.release() 方法。当没有资源时会阻塞等待。其底层是通过内部类 Sync 继承 AQS 实现的。

简单地说:CyclicBarrier 是线程之间互相等待,而 CountdownLatch 则是主次线程等待,而 Semaphore 则是类似停车场的资源控制。 其次它们之间还有其他小的差异,例如 CyclicBarrier 的计数是会自动恢复的,但是 CountdownLatch 的计数是不可恢复的。此外,它们在底层上也是有些不同的。CyclicBarrier 是用 Lock/Condition 实现的,而 CountdownLatch、Semaphore 则是通过内部类 Sync 继承 AQS 的共享锁实现的。

参考资料:

上次编辑于: 2022/7/30 09:07:48