抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

六个同步器

  • ReentrantLock
  • Semaphore [ˈseməfɔːr]: 控制并发量
  • CyclicBarrier: 分治策略的一种表达
  • CountDownLatch [lætʃ]
  • Phaser
  • Exchanger

基于 AQS 实现同步器.

ReentrantLock

synchronized 相似点

  • 临界区保护(提供锁/解锁能力)
  • 可重入
  • 都提供线程间协作
    • Object.wait, Object.notify
    • Condition.await, Condition.signal
  • 提供锁的升级逻辑
    • monitor: 偏向锁 -> 轻量级锁 -> 重量级锁
    • AQS: CAS 竞争 -> 休眠 + 排队竞争
  • 都提供等待队列
    • monitor: EntrySet, WaitSet
    • AQS: CLH队列

synchronized 区别点

  • 基于 AQS vs 基于 Monitor
  • Java 生态 vs 非 Java 生态(C/C++)
  • 响应线程中断(InterruptException) vs 不响应
  • 提供 tryLock vs 不提供
  • Block vs 单 Block
  • 可配置公平 vs 不可配置

重入: 抢占的线程可以多次执行 lock(就是再次请求持有锁的临界资源). 优势: 降低心智负担.

会判断 owner 是否是一样的, 如果是, 可以直接拿到锁, 避免锁的竞争

公平意味着牺牲一定的性能.

实现生产者/消费者模型

public class ProducerCustomerModel {
    static final int MAX = 10;
    /**
     * 生产队列
     */
    LinkedList<Integer> queue = new LinkedList<>();
    /**
     * 重入锁
    */
    ReentrantLock lock = new ReentrantLock();
    /**
     * 队列满状态
     */
    Condition full = lock.newCondition();
    /**
     * 队列空状态
     */
    Condition empty = lock.newCondition();

    /**
     * Producer, 生产者
     *
     * @throws InterruptedException
     */
    public void produce() throws InterruptedException {
        lock.lock();
        if (queue.size() == MAX) {
            // wait 操作都是 monitor 提供的. 这是一个坑来的.
            // 注意使用 await 方法
            full.await();
            return;
        }
        // 操作队列 生产逻辑
        var data = readData();
        // 根据size判断唤醒生产
        if (queue.size() == 1) {
            empty.signalAll();
        }
        queue.add(data);
        lock.unlock();
    }

    /**
     * Consumer, 消费者
     *
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {
        lock.lock();
        // 队列空了
        if (queue.size() == 0) {
            empty.await();
            return;
        }
        // 消费数据
        queue.remove();
        // 操作队列
        if (queue.size() == MAX - 1) {
            full.signalAll();
        }
        lock.unlock();
    }
    /**
     * 测试.
     *
     * @param args
     */
    public static void main(String[] args) {
        var p = new ProducerCustomerModel();
        // 新建100个线程 生产数据
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        p.readDb();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        // 消费
        new Thread(() -> {
            while (true) {
                try {
                    p.calculate();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Semaphore

允许 N个 线程同时进入临界区。

使用场景: 信号量控制并发量.

// 允许10个线程同时访问
var semaphore = new Semaphore(10);
var lock = new ReentrantLock();
Runnable runnable = () -> {
    try {
        // 获取许可
        semaphore.acquire(0);
        // 开销较大的计算. semaphore 可以控制并发的数量
        lock.lock();
        // 临界区
        lock.unlock();
    } finally {
        // 释放信号量
        semaphore.release(0);
    }
};
for (int i = 0; i < 1000; i++) {
    new Thread(runnable).start();
}

实现生产者/消费者模型

final int MAX = 100;
LinkedList<Integer> queue = new LinkedList<>();
// 可生产的信号
Semaphore empty = new Semaphore(MAX);
// 已经生产的信号
Semaphore full = new Semaphore(0);
// 生产
void produce() throws InterruptedException {
    // -1. 如果获取不到会进行休眠等待
    empty.acquire();
    synchronized (queue) {
        // 操作队列
    }
    // +1
    full.release();
}
// 消费
void consume() throws InterruptedException {
    // -1. 获取已经生产的.
    full.acquire();
    synchronized (queue) {
        // 操作队列
    }
    // +1. 可以生产的增加.
    empty.release();
}

信号量的作用, 具体作用是什么? 控制并发量. 可以实现生产者/消费者.

ArrayBlockingQueue为何不用 Semaphore 实现? 条件变量(ReentrantLock)的性能更好. 这个类的本身是控制并发量的, 不是线程间的协作。

Semaphore的价值呢?抽象的价值–降低心智负担;其它场景:比如流量控制

CyclicBarrier

循环屏障。阻塞当前线程,等待其他线程。

其实是分治策略一个并发的表达。 CountDownLatch 就是只有一代的 CyclicBarrier

示例: 1000w订单要处理, 分成1000批数据。

  • 那就要 1w 个订单 * 1k 代。每代处理1w个。
  • 每代2个线程 – 2个party合作方,一个准备商品,一个准备发货单。

这样解决了什么问题?

  • 并发读取订单关联的商品和发货单
  • 降低计算成本,内存使用等等(分治策略)
  • 控制处理速度

总得来说解决了什么问题?

  • 多个线程协作(通信)处理任务的问题。循环+屏障。

为什么不用 ReentrantLock + Condition?

其它方法也可以实现,但是CyclicBarrier的场景具有通用性抽象数据结构有价值。比如:批量处理大量的数据和任务。

package org.lgq.interview.collection;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @author DevLGQ
 * @version 1.0
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
        new CyclicBarrierDemo().go();
    }

    private void go() throws InterruptedException {
        // 初始化 栅栏的参与者为3
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        // 依次启动三个线程
        new Thread(new CyclicBarrierDemo.Task(cyclicBarrier)).start();
        Thread.sleep(1000);
        new Thread(new CyclicBarrierDemo.Task(cyclicBarrier)).start();
        Thread.sleep(1000);
        new Thread(new CyclicBarrierDemo.Task(cyclicBarrier)).start();
    }

    class Task implements Runnable {
        private CyclicBarrier cyclicBarrier;

        public Task(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + " 已经到达 : " + System.currentTimeMillis());
            try {
                // 等到所有参与者都到达才开始执行任务
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程" + Thread.currentThread().getName() + " 开始处理 : " + System.currentTimeMillis());
        }

    }
}
  • 等待其它线程,会阻塞当前线程,所有线程必须同时到达栅栏为止后,才能继续执行。
  • 所有线程达到栅栏处,可以触发另外一个预先设置的线程。

CountDownLatch

让主线程等待一组事件发生后继续执行,事件指的是CountDownLatch里的countDown()方法。

实际相当于只有一代CyclicBarrier

本质上还是解决了多线程协作(通信)处理任务的问题。

package org.lgq.interview.collection;

import java.util.concurrent.CountDownLatch;

/**
 * @author DevLGQ
 * @version 1.0
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        new CountDownLatchDemo().go();
    }

    private void go() throws InterruptedException {
        // count初始化为3
        CountDownLatch countDownLatch = new CountDownLatch(3);
        // 依次启动三个线程
        new Thread(new Task(countDownLatch)).start();
        Thread.sleep(1000);

        new Thread(new Task(countDownLatch)).start();
        Thread.sleep(1000);

        new Thread(new Task(countDownLatch)).start();
        Thread.sleep(1000);
        // 让主线程开始等待, 直到count为0为止
        countDownLatch.await();
        System.out.println("所有线程已经到达, 主线程开始执行" + System.currentTimeMillis());
    }

    class Task implements Runnable {
        private CountDownLatch countDownLatch;

        public Task(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + " 已经到达" + System.currentTimeMillis());
            countDownLatch.countDown();
        }

    }
}

Phaser

屏障部分领域语言(DSL)。

  • 屏障(Barrier): 合作线程在屏障等待, 然后进入同步点.
  • 同步点(Synchronization Point): 通过屏障后执行的同步代码.
  • 合作数量(parties): 就是互相等待的线程数量, 只有当等待数量达到parties, 才会进入同步点.
  • 到来(arrive): 代表一个线程到达屏障, 并等待, 每次线程到来, arrives + 1.
  • 到达数量(arrives): 到达的线程数量.
  • 等待(wait): 代表线程在 barrier 上等待.
  • 进步(advance): 一个线程通过屏障, 称为进步, 代表工作有进度.
  • 开动/下一阶段(tripping/next phaser): 到来的线程数量=parties, 开始进入同步点.
  • 阶段(phaser number): 类似 CyclicBarrier 中的代, 每次完成一次开动, phaser number + 1.

线程能力

  • arrive(到达): 在屏障上等待其他合作方, 到达线程(arrives)增1
  • waitAdvance(等待进步): 在屏障上等待其他线程, 数量够了就进入同步点
  • register(注册): 相当于声明自己是一个合作方, 将 parties+1
  • deregister(注销): 相当于注销自己, parties-1

Exchanger

两个线程到达同步点后,相互交换数据

var exchanger = new Exchanger();
// Thread 1: 生产者
初始化: writerBuffer = 空
while(true){
    // 写入数据
    writeDataBuffer(writeBuffer);
    writeBuffer = exchanger.exchange(writeBuffer);
}
// Thread 2: 消费者
初始化: readBuffer = 空
while(true){
    // 会阻塞, 等生产者生产完成
    var readBuffer = exchanger.exchange(readBuffer);
    var str = readFromBuffer(readBuffer);
}

解决了线程间高效交换数据的问题(2个线程)。

同步器总结

解决了同步问题;线程间的协作模式

评论