何小豆儿在此 发表于 2024-5-13 07:06:53

多线程系列(十五) -常用并发工具类详解

一、择要

在前几篇文章中,我们讲到了线程、线程池、BlockingQueue 等焦点组件,其实 JDK 给开发者还提供了比synchronized更加高级的线程同步组件,好比 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 等并发工具类。
下面我们一起来相识一下这些常用的并发工具类!
二、常用并发工具类

2.1、CountDownLatch

CountDownLatch是 JDK5 之后参加的一种并发流程控制工具类,它允许一个或多个线程一直等待,直到其他线程运行完成后再执行。
它的工作原理重要是通过一个计数器来实现,初始化的时候必要指定线程的数量;每当一个线程完成了自己的任务,计数器的值就相应得减 1;当计数器到达 0 时,表示所有的线程都已经执行完毕,处于等待的线程就可以恢复继续执行任务。
根据CountDownLatch的工作原理,它的应用场景一样平常可以分别为两种:

[*]场景一:某个线程必要在其他 n 个线程执行完毕后,再继续执行
[*]场景二:多个工作线程等待某个线程的下令,同时执行同一个任务
下面我们先来看下两个简朴的示例。
示例1:某个线程等待 n 个工作线程

好比某项任务,先接纳多线程去执行,最后必要在主线程中进行汇总处理,这个时候CountDownLatch就可以发挥作用了,详细应用如下!
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
      // 采用 10 个工作线程去执行任务
      final int threadCount = 10;
      CountDownLatch countDownLatch = new CountDownLatch(threadCount);
      for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  // 执行具体任务
                  System.out.println("thread name:" +Thread.currentThread().getName() + ",执行完毕!");
                  // 计数器减 1
                  countDownLatch.countDown();
                }
            }).start();
      }

      // 阻塞等待 10 个工作线程执行完毕
      countDownLatch.await();
      System.out.println("所有任务线程已执行完毕,准备进行结果汇总");
    }
}运行结果如下:
thread name:Thread-0,执行完毕!
thread name:Thread-2,执行完毕!
thread name:Thread-1,执行完毕!
thread name:Thread-3,执行完毕!
thread name:Thread-4,执行完毕!
thread name:Thread-5,执行完毕!
thread name:Thread-6,执行完毕!
thread name:Thread-7,执行完毕!
thread name:Thread-8,执行完毕!
thread name:Thread-9,执行完毕!
所有任务线程执行完毕,准备进行结果汇总示例2:n 个工作线程等待某个线程

好比田径赛跑,10 个同砚预备开跑,但是必要等工作人员发出枪声才允许开跑,利用CountDownLatch可以实现这一功能,详细应用如下!
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
      // 使用一个计数器
      CountDownLatch countDownLatch = new CountDownLatch(1);
      final int threadCount = 10;
      // 采用 10 个工作线程去执行任务
      for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  try {
                        // 阻塞等待计数器为 0
                        countDownLatch.await();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                  // 发起某个服务请求,省略
                  System.out.println("thread name:" +Thread.currentThread().getName() + ",开始执行!");

                }
            }).start();
      }

      Thread.sleep(1000);
      System.out.println("thread name:" +Thread.currentThread().getName() + " 准备开始!");
      // 将计数器减 1,运行完成后为 0
      countDownLatch.countDown();
    }
}运行结果如下:
thread name:main 准备开始!
thread name:Thread-0,开始执行!
thread name:Thread-1,开始执行!
thread name:Thread-2,开始执行!
thread name:Thread-3,开始执行!
thread name:Thread-5,开始执行!
thread name:Thread-6,开始执行!
thread name:Thread-8,开始执行!
thread name:Thread-7,开始执行!
thread name:Thread-4,开始执行!
thread name:Thread-9,开始执行!从上面的示例可以很清晰的看到,CountDownLatch类似于一个倒计数器,当计数器为 0 的时候,调用await()方法的线程会被解除等待状态,然后继续执行。
CountDownLatch类的重要方法,有以下几个:

[*]public CountDownLatch(int count):焦点构造方法,初始化的时候必要指定线程数
[*]countDown():每调用一次,计数器值 -1,直到 count 被减为 0,表示所有线程全部执行完毕
[*]await():等待计数器变为 0,即等待所有异步线程执行完毕,否则一直阻塞
[*]await(long timeout, TimeUnit unit):支持指定时间内的等待,避免永久阻塞,await()的一个重载方法
从以上的分析可以得出,当计数器为 1 的时候,即由一个线程来关照其他线程,结果等同于对象的wait()和notifyAll();当计时器大于 1 的时候,可以实现多个工作线程完成任务后关照一个大概多个等待线程继续工作,CountDownLatch可以当作是一种进阶版的等待/关照机制,在实际中应用比力多见。
2.2、CyclicBarrier

CyclicBarrier从字面上很容易理解,表示可循环利用的屏障,它真正的作用是让一组线程到达一个屏障时被阻塞,直到满足要求的线程数都到达屏障时,屏障才会解除,此时所有被屏障阻塞的线程就可以继续执行。
下面我们还是先看一个简朴的示例,以便于更好的理解这个工具类。
public class CyclicBarrierTest {

    public static void main(String[] args) {
      // 设定参与线程的个数为 5
      int threadCount = 5;
      CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有的线程都已经准备就绪...");
            }
      });
      for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  System.out.println("thread name:" +Thread.currentThread().getName() + ",已达到屏障!");
                  try {
                        cyclicBarrier.await();
                  } catch (Exception e) {
                        e.printStackTrace();
                  }
                  System.out.println("thread name:" +Thread.currentThread().getName() + ",阻塞解除,继续执行!");
                }
            }).start();
      }
    }
}输出结果:
thread name:Thread-0,已达到屏障!
thread name:Thread-1,已达到屏障!
thread name:Thread-2,已达到屏障!
thread name:Thread-3,已达到屏障!
thread name:Thread-4,已达到屏障!
所有的线程都已经准备就绪...
thread name:Thread-4,阻塞解除,继续执行!
thread name:Thread-0,阻塞解除,继续执行!
thread name:Thread-3,阻塞解除,继续执行!
thread name:Thread-1,阻塞解除,继续执行!
thread name:Thread-2,阻塞解除,继续执行!从上面的示例可以很清晰的看到,CyclicBarrier中设定的线程数相称于一个屏障,当所有的线程数达到时,此时屏障就会解除,线程继续执行剩下的逻辑。
CyclicBarrier类的重要方法,有以下几个:

[*]public CyclicBarrier(int parties):构造方法,parties参数表示参与线程的个数
[*]public CyclicBarrier(int parties, Runnable barrierAction):焦点构造方法,barrierAction参数表示线程到达屏障时的回调方法
[*]public void await():焦点方法,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当火线程被阻塞,直到屏障解除,继续执行剩下的逻辑
从以上的示例中,可以看到CyclicBarrier与CountDownLatch有很多的相似之处,都能够实现线程之间的等待,但是它们的侧重点差异:

[*]CountDownLatch一样平常用于一个或多个线程,等待其他的线程执行完任务后再执行
[*]CyclicBarrier一样平常用于一组线程等待至某个状态,当状态解除之后,这一组线程再继续执行
[*]CyclicBarrier中的计数器可以反复利用,而CountDownLatch用完之后只能重新初始化
2.3、Semaphore

Semaphore通常我们把它称之为信号计数器,它可以保证同一时候最多有 N 个线程能访问某个资源,好比同一时候最多允许 10 个用户访问某个服务,同一时候最多创建 100 个数据库毗连等等。
Semaphore可以用于控制并发的线程数,实际应用场景非常的广,好比流量控制、服务限流等等。
下面我们看一个简朴的示例。
public class SemaphoreTest {

    public static void main(String[] args) {
      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

      // 同一时刻仅允许最多3个线程获取许可
      final Semaphore semaphore = new Semaphore(3);
      // 初始化 5 个线程生成
      for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  try {
                        // 如果超过了许可数量,其他线程将在此等待
                        semaphore.acquire();
                        System.out.println(format.format(new Date()) +" thread name:" +Thread.currentThread().getName() + " 获取许可,开始执行任务");
                        // 假设执行某项任务的耗时
                        Thread.sleep(2000);
                  } catch (Exception e) {
                        e.printStackTrace();
                  } finally {
                        // 使用完后释放许可
                        semaphore.release();
                  }
                }
            }).start();
      }
    }
}输出结果:
2023-11-22 17:32:01 thread name:Thread-0 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-1 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-2 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-4 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-3 获取许可,开始执行任务从上面的示例可以很清晰的看到,同一时候前 3 个线程获得了许可优先执行, 2 秒事后许可被开释,剩下的 2 个线程获取开释的许可继续执行。
Semaphore类的重要方法,有以下几个:

[*]public Semaphore(int permits):构造方法,permits参数表示同一时间能访问某个资源的线程数量
[*]acquire():获取一个许可,在获取到许可之前大概被其他线程调用停止之前,线程将一直处于阻塞状态
[*]tryAcquire(long timeout, TimeUnit unit):表示在指定时间内实验获取一个许可,如果获取成功,返回true;反之false
[*]release():开释一个许可,同时唤醒一个获取许可不成功的阻塞线程。
通过permits参数的设定,可以实现限制多个线程同时访问服务的结果,当permits参数为 1 的时候,表示同一时候只有一个线程能访问服务,相称于一个互斥锁,结果等同于synchronized。
利用Semaphore的时候,通常必要先调用acquire()大概tryAcquire()获取许可,然后通过try ... finally模块在finally中开释许可。
例如如下方式,实验在 3 秒内获取许可,如果没有获取就退出,防止程序一直阻塞。
// 尝试 3 秒内获取许可
if(semaphore.tryAcquire(3, TimeUnit.SECONDS)){
    try {
       // ...业务逻辑
    }finally {
      // 释放许可
      semaphore.release();
    }
}2.4、Exchanger

Exchanger从字面上很容易理解表示交换,它重要用途在两个线程之间进行数据交换,注意也只能在两个线程之间进行数据交换。
Exchanger提供了一个exchange()同步交换方法,当两个线程调用exchange()方法时,无论调用时间先后,会相互等待线程到达exchange()方法同步点,此时两个线程进行交换数据,将本线程产出数据传递给对方。
简朴的示例如下。
public class ExchangerTest {

    public static void main(String[] args) {
      // 交换同步器
      Exchanger<String> exchanger = new Exchanger<>();

      // 线程1
      new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                  String value = "A";
                  System.out.println("thread name:" +Thread.currentThread().getName() + " 原数据:" + value);
                  String newValue = exchanger.exchange(value);
                  System.out.println("thread name:" +Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
            }
      }).start();

      // 线程2
      new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                  String value = "B";
                  System.out.println("thread name:" +Thread.currentThread().getName() + " 原数据:" + value);
                  String newValue = exchanger.exchange(value);
                  System.out.println("thread name:" +Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
            }
      }).start();
    }
}输出结果:
thread name:Thread-0 原数据:A
thread name:Thread-1 原数据:B
thread name:Thread-0 交换后的数据:B
thread name:Thread-1 交换后的数据:A从上面的示例可以很清晰的看到,当线程Thread-0和Thread-1都到达了exchange()方法的同步点时,进行了数据交换。
Exchanger类的重要方法,有以下几个:

[*]exchange(V x):等待另一个线程到达此交换点,然后将给定的对象传送给该线程,并接收该线程的对象,除非当火线程被停止,否则一直阻塞等待
[*]exchange(V x, long timeout, TimeUnit unit):表示在指定的时间内等待另一个线程到达此交换点,如果超时会自动退出并抛超时异常
如果多个线程调用exchange()方法,数据交换可能会出现混乱,因此实际上Exchanger应用并不多见。
三、小结

本文重要围绕 Java 多线程中常见的并发工具类进行了简朴的用例介绍,这些工具类都可以实现线程同步的结果,底层原理实现重要是基于 AQS 队列式同步器来实现,关于 AQS 我们会在后期的文章中再次介绍。
本文篇幅稍有所长,内容难免有所遗漏,接待大家留言指出!
四、参考

1.https://www.cnblogs.com/xrq730/p/4869671.html
2.https://zhuanlan.zhihu.com/p/97055716

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 多线程系列(十五) -常用并发工具类详解