六个同步器
- ReentrantLock
- Semaphore [ˈseməfɔːr]: 控制并发量
- CyclicBarrier: 分治策略的一种表达
- CountDownLatch [lætʃ]
- Phaser
- Exchanger
基于 AQS 实现同步器.
ReentrantLock
和 synchronized
相似点
- 临界区保护(提供锁/解锁能力)
- 可重入
- 都提供线程间协作
- Object.wait, Object.notify
- Condition.await, Condition.signal
- 提供锁的升级逻辑
- monitor: 偏向锁 -> 轻量级锁 -> 重量级锁
- AQS: CAS 竞争 -> 休眠 + 排队竞争
- 都提供等待队列
- monitor:
EntrySet
,WaitSet
- AQS:
CLH队列
- monitor:
和 synchronized
区别点
- 基于
AQS
vs 基于Monitor
- Java 生态 vs 非 Java 生态(C/C++)
- 响应线程中断(InterruptException) vs 不响应
- 提供
tryLock
vs 不提供 - 跨
Block
vs 单Block
- 可配置公平 vs 不可配置
重入: 抢占的线程可以多次执行 lock
(就是再次请求持有锁的临界资源). 优势: 降低心智负担.
会判断 owner
是否是一样的, 如果是, 可以直接拿到锁, 避免锁的竞争。
公平意味着牺牲一定的性能.
实现生产者/消费者模型
public class ProducerCustomerModel {
static final int MAX = 10;
/**
* 生产队列
*/
LinkedList<Integer> queue = new LinkedList<>();
/**
* 重入锁
*/
ReentrantLock lock = new ReentrantLock();
/**
* 队列满状态
*/
Condition full = lock.newCondition();
/**
* 队列空状态
*/
Condition empty = lock.newCondition();
/**
* Producer, 生产者
*
* @throws InterruptedException
*/
public void produce() throws InterruptedException {
lock.lock();
if (queue.size() == MAX) {
// wait 操作都是 monitor 提供的. 这是一个坑来的.
// 注意使用 await 方法
full.await();
return;
}
// 操作队列 生产逻辑
var data = readData();
// 根据size判断唤醒生产
if (queue.size() == 1) {
empty.signalAll();
}
queue.add(data);
lock.unlock();
}
/**
* Consumer, 消费者
*
* @throws InterruptedException
*/
public void consume() throws InterruptedException {
lock.lock();
// 队列空了
if (queue.size() == 0) {
empty.await();
return;
}
// 消费数据
queue.remove();
// 操作队列
if (queue.size() == MAX - 1) {
full.signalAll();
}
lock.unlock();
}
/**
* 测试.
*
* @param args
*/
public static void main(String[] args) {
var p = new ProducerCustomerModel();
// 新建100个线程 生产数据
for (int i = 0; i < 100; i++) {
new Thread(() -> {
while (true) {
try {
p.readDb();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 消费
new Thread(() -> {
while (true) {
try {
p.calculate();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
Semaphore
允许 N个 线程同时进入临界区。
使用场景: 信号量控制并发量.
// 允许10个线程同时访问
var semaphore = new Semaphore(10);
var lock = new ReentrantLock();
Runnable runnable = () -> {
try {
// 获取许可
semaphore.acquire(0);
// 开销较大的计算. semaphore 可以控制并发的数量
lock.lock();
// 临界区
lock.unlock();
} finally {
// 释放信号量
semaphore.release(0);
}
};
for (int i = 0; i < 1000; i++) {
new Thread(runnable).start();
}
实现生产者/消费者模型
final int MAX = 100;
LinkedList<Integer> queue = new LinkedList<>();
// 可生产的信号
Semaphore empty = new Semaphore(MAX);
// 已经生产的信号
Semaphore full = new Semaphore(0);
// 生产
void produce() throws InterruptedException {
// -1. 如果获取不到会进行休眠等待
empty.acquire();
synchronized (queue) {
// 操作队列
}
// +1
full.release();
}
// 消费
void consume() throws InterruptedException {
// -1. 获取已经生产的.
full.acquire();
synchronized (queue) {
// 操作队列
}
// +1. 可以生产的增加.
empty.release();
}
信号量的作用, 具体作用是什么? 控制并发量. 可以实现生产者/消费者.
ArrayBlockingQueue
为何不用 Semaphore
实现? 条件变量(ReentrantLock)的性能更好. 这个类的本身是控制并发量的, 不是线程间的协作。
那Semaphore
的价值呢?抽象的价值–降低心智负担;其它场景:比如流量控制。
CyclicBarrier
循环屏障。阻塞当前线程,等待其他线程。
其实是分治策略一个并发的表达。 CountDownLatch
就是只有一代的 CyclicBarrier
。
示例: 1000w订单要处理, 分成1000批数据。
- 那就要 1w 个订单 * 1k 代。每代处理1w个。
- 每代2个线程 – 2个party合作方,一个准备商品,一个准备发货单。
这样解决了什么问题?
- 并发读取订单关联的商品和发货单
- 降低计算成本,内存使用等等(分治策略)
- 控制处理速度
总得来说解决了什么问题?
- 多个线程协作(通信)处理任务的问题。循环+屏障。
为什么不用 ReentrantLock
+ Condition
?
其它方法也可以实现,但是CyclicBarrier
的场景具有通用性,抽象成数据结构有价值。比如:批量处理大量的数据和任务。
package org.lgq.interview.collection;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author DevLGQ
* @version 1.0
*/
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
new CyclicBarrierDemo().go();
}
private void go() throws InterruptedException {
// 初始化 栅栏的参与者为3
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
// 依次启动三个线程
new Thread(new CyclicBarrierDemo.Task(cyclicBarrier)).start();
Thread.sleep(1000);
new Thread(new CyclicBarrierDemo.Task(cyclicBarrier)).start();
Thread.sleep(1000);
new Thread(new CyclicBarrierDemo.Task(cyclicBarrier)).start();
}
class Task implements Runnable {
private CyclicBarrier cyclicBarrier;
public Task(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + " 已经到达 : " + System.currentTimeMillis());
try {
// 等到所有参与者都到达才开始执行任务
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() + " 开始处理 : " + System.currentTimeMillis());
}
}
}
- 等待其它线程,会阻塞当前线程,所有线程必须同时到达栅栏为止后,才能继续执行。
- 所有线程达到栅栏处,可以触发另外一个预先设置的线程。
CountDownLatch
让主线程等待一组事件发生后继续执行,事件指的是CountDownLatch
里的countDown()
方法。
实际相当于只有一代CyclicBarrier
。
本质上还是解决了多线程协作(通信)处理任务的问题。
package org.lgq.interview.collection;
import java.util.concurrent.CountDownLatch;
/**
* @author DevLGQ
* @version 1.0
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
new CountDownLatchDemo().go();
}
private void go() throws InterruptedException {
// count初始化为3
CountDownLatch countDownLatch = new CountDownLatch(3);
// 依次启动三个线程
new Thread(new Task(countDownLatch)).start();
Thread.sleep(1000);
new Thread(new Task(countDownLatch)).start();
Thread.sleep(1000);
new Thread(new Task(countDownLatch)).start();
Thread.sleep(1000);
// 让主线程开始等待, 直到count为0为止
countDownLatch.await();
System.out.println("所有线程已经到达, 主线程开始执行" + System.currentTimeMillis());
}
class Task implements Runnable {
private CountDownLatch countDownLatch;
public Task(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + " 已经到达" + System.currentTimeMillis());
countDownLatch.countDown();
}
}
}
Phaser
屏障部分领域语言(DSL)。
- 屏障(Barrier): 合作线程在屏障等待, 然后进入同步点.
- 同步点(Synchronization Point): 通过屏障后执行的同步代码.
- 合作数量(parties): 就是互相等待的线程数量, 只有当等待数量达到parties, 才会进入同步点.
- 到来(arrive): 代表一个线程到达屏障, 并等待, 每次线程到来, arrives + 1.
- 到达数量(arrives): 到达的线程数量.
- 等待(wait): 代表线程在 barrier 上等待.
- 进步(advance): 一个线程通过屏障, 称为进步, 代表工作有进度.
- 开动/下一阶段(tripping/next phaser): 到来的线程数量=parties, 开始进入同步点.
- 阶段(phaser number): 类似 CyclicBarrier 中的代, 每次完成一次开动, phaser number + 1.
线程能力
- arrive(到达): 在屏障上等待其他合作方, 到达线程(arrives)增1
- waitAdvance(等待进步): 在屏障上等待其他线程, 数量够了就进入同步点
- register(注册): 相当于声明自己是一个合作方, 将 parties+1
- deregister(注销): 相当于注销自己, parties-1
Exchanger
两个线程到达同步点后,相互交换数据。
var exchanger = new Exchanger();
// Thread 1: 生产者
初始化: writerBuffer = 空
while(true){
// 写入数据
writeDataBuffer(writeBuffer);
writeBuffer = exchanger.exchange(writeBuffer);
}
// Thread 2: 消费者
初始化: readBuffer = 空
while(true){
// 会阻塞, 等生产者生产完成
var readBuffer = exchanger.exchange(readBuffer);
var str = readFromBuffer(readBuffer);
}
解决了线程间高效交换数据的问题(2个线程)。
同步器总结
解决了同步问题;线程间的协作模式。