java 多线程之Worker Thread模式(Thread Pool模式)

打印 上一主题 下一主题

主题 1641|帖子 1641|积分 4923

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Worker Thread模式

Worker的意思是工作的人,在Worker Thread模式中,工人线程Worker thread会逐个取回工作并进行处置惩罚,当所有工作全部完成后,工人线程会期待新的工作到来。
Worker Thread模式也被成为Background Thread(配景线程)模式,另外,如果从生存多个工人线程的场合这一点看,我们也可以称这种模式为Thread Pool模式。
Worker Thread模式中的脚色

TransPortThread [将部件塞到传送带上](委托者)
创建表现工作请求的Cargo并将其通报给SimpleChannel 。在示例步调中,TransPortThread 相称于该脚色。
2.SimpleChannel [传送带的脚色](通讯线路)
SimpleChannel 脚色接受来自于TransPortThread 的Cargo,并将其通报给WorkerThread。在示例步调中,SimpleChannel 相称于该脚色。
3.WorkerThread(工人)
WorkerThread脚色从Channel中获取Cargo,并进行工作,当一项工作完成后,它会继承去获取另外的Cargo,在示例步调中,WorkerThread相称于该脚色。
4.Cargo(部件)
Cargo脚色是表现工作中的部件,Cargo脚色中生存了进行工作所必须的信息,在示例步调中,Cargo相称于该部件脚色。
Worker Thread利用场景

想象这样一个场景,一个工厂在生产货物,在一个车间里,有几个工人,每次生产部件准备后,车间外的人就将部件放到车间的传送带上,工人每次做完一个货品就从桌子上取部件。在这里,注意到,部件并不是直接交给工人的,另外一点,工人并不是做完一个部件就回家换个新人,后者在现实有点滑稽,但是在步调中却对应一个典型的线程利用方法:线程池。
所谓线程池,就是对线程的复用,当线程执行完任务之后就继承取其他任务执行,而不是销毁启动新线程执行其他任务。因为线程的启动对于系统性能开销比较大,所以这样对于系统性能的进步很有好处。
Main.java

  1. public class WorkClient {
  2.     public static void main(String[] args) {
  3.         // 创建容量为5的SimpleChannel实例(数据缓冲通道)
  4.         SimpleChannel simpleChannel = new SimpleChannel(5);
  5.         // 启动工作通道开始处理数据(消费者启动)
  6.         simpleChannel.startWork();
  7.         // 启动三个数据传输线程(生产者角色)
  8.         new TransPortThread("june", simpleChannel).start();
  9.         new TransPortThread("july", simpleChannel).start();
  10.         new TransPortThread("alen", simpleChannel).start();
  11.     }
  12. }
复制代码
Cargo.java

脚色:部件或货物
  1. import java.util.Optional;
  2. /**
  3. * 货物类,包含货物编号和名称,并提供执行操作时打印线程执行信息的功能
  4. *
  5. * @author [请填写作者名称]
  6. * @version 1.0
  7. */
  8. public class Cargo {
  9.     /**
  10.      * 货物名称
  11.      */
  12.     private final String name;
  13.     /**
  14.      * 货物编号
  15.      */
  16.     private final int num;
  17.     /**
  18.      * 构造方法,初始化货物信息
  19.      *
  20.      * @param name 货物名称
  21.      * @param num  货物编号
  22.      */
  23.     public Cargo(String name, int num) {
  24.         this.name = name;
  25.         this.num = num;
  26.     }
  27.     /**
  28.      * 执行货物处理操作,打印当前线程名称和货物信息
  29.      */
  30.     public void execute(){
  31.         Optional.of(Thread.currentThread().getName()+" executed "+this.toString()).ifPresent(System.out::println);
  32.     }
  33.     /**
  34.      * 获取货物信息的字符串表示
  35.      *
  36.      * @return 格式化后的货物信息(例如:Cargo ==> NO.123 Name.ItemA)
  37.      */
  38.     @Override
  39.     public String toString() {
  40.         return "Cargo ==> NO."+num+" Name."+name;
  41.     }
  42. }
复制代码
SimpleChannel.java

职责:将部件或货物从源头传给处置惩罚端
  1. import java.util.Arrays;
  2. /**
  3. * 简单线程通信通道,管理货物队列和工作线程池
  4. * 使用环形缓冲区实现生产者-消费者模式,通过synchronized和wait/notify机制保证线程安全
  5. */
  6. public class SimpleChannel {
  7.     /** 缓冲区最大容量 */
  8.     private static final int BUFFER_SIZE = 100;
  9.     /** 环形缓冲区数组,存储货物对象 */
  10.     private final Cargo[] cargoQueue;
  11.     /** 队列头指针,指向下一个要取出的元素位置 */
  12.     private int headIndex;
  13.     /** 当前队列中的元素数量 */
  14.     private int count;
  15.     /** 队列尾指针,指向下一个要插入的元素位置 */
  16.     private int tailIndex;
  17.     /** 工作线程池 */
  18.     private final WorkerThread[] workerPool;
  19.     /**
  20.      * 构造方法初始化通道
  21.      * @param workerCount 工作线程数量
  22.      */
  23.     public SimpleChannel(int workerCount) {
  24.         this.cargoQueue = new Cargo[BUFFER_SIZE];
  25.         this.headIndex = 0;
  26.         this.count = 0;
  27.         this.tailIndex = 0;
  28.         this.workerPool = new WorkerThread[workerCount];
  29.         this.init();
  30.     }
  31.     /**
  32.      * 初始化工作线程池
  33.      * 为每个线程设置名称并绑定当前通道实例
  34.      */
  35.     private void init() {
  36.         for (int i = 0; i < workerPool.length; i++) {
  37.             workerPool[i] = new WorkerThread("worker-"+i, this);
  38.         }
  39.     }
  40.     /**
  41.      * 启动所有工作线程开始工作
  42.      * 通过遍历线程池调用start()方法启动每个线程
  43.      */
  44.     public void startWork() {
  45.         Arrays.asList(workerPool).forEach(WorkerThread::start);
  46.     }
  47.     /**
  48.      * 生产者添加货物到缓冲区
  49.      * @param cargo 要添加的货物对象
  50.      * @throws InterruptedException 线程中断异常
  51.      */
  52.     public synchronized void pushCargo(Cargo cargo) {
  53.         while (count >= BUFFER_SIZE) {
  54.             try {
  55.                 this.wait(); // 缓冲区满时等待
  56.             } catch (InterruptedException e) {
  57.                 // 异常处理(建议补充日志)
  58.             }
  59.         }
  60.         // 添加货物到尾部并更新指针
  61.         this.cargoQueue[tailIndex] = cargo;
  62.         this.tailIndex = (this.tailIndex + 1) % this.cargoQueue.length;
  63.         this.count++;
  64.         this.notifyAll(); // 通知等待线程
  65.     }
  66.     /**
  67.      * 消费者从缓冲区取出货物
  68.      * @return 取出的货物对象
  69.      * @throws InterruptedException 线程中断异常
  70.      */
  71.     public synchronized Cargo popCargo() {
  72.         while (count <= 0) {
  73.             try {
  74.                 this.wait(); // 缓冲区空时等待
  75.             } catch (InterruptedException e) {
  76.                 // 异常处理(建议补充日志)
  77.             }
  78.         }
  79.         // 取出头部元素并更新指针
  80.         Cargo cargo = this.cargoQueue[headIndex];
  81.         this.headIndex = (this.headIndex + 1) % this.cargoQueue.length;
  82.         this.count--;
  83.         this.notifyAll(); // 通知等待线程
  84.         return cargo;
  85.     }
  86. }
复制代码
TransPortThread .java

职责:将请求的信息塞到传输通道上去
  1. /**
  2. * 运输线程,负责持续生成货物并推送到传输通道 {@link SimpleChannel}
  3. */
  4. public class TransPortThread extends Thread {
  5.     private SimpleChannel simpleChannel;
  6.     /**
  7.      * 静态随机数生成器,用于生成随机休眠时间
  8.      */
  9.     private static final Random RANDOM = new Random(System.currentTimeMillis());
  10.     /**
  11.      * 构造方法
  12.      *
  13.      * @param name 线程名称
  14.      * @param simpleChannel 货物传输通道
  15.      */
  16.     public TransPortThread(String name, SimpleChannel simpleChannel) {
  17.         super(name);
  18.         this.simpleChannel = simpleChannel;
  19.     }
  20.     @Override
  21.     public void run() {
  22.         try {
  23.             int i = 0;
  24.             // 无限循环持续生成货物
  25.             while (true){
  26.                 Cargo cargo = new Cargo(getName(), i++); // 创建货物对象(包含线程名和序号)
  27.                 this.simpleChannel.pushCargo(cargo);   // 将货物推送到通道
  28.                 Thread.sleep(RANDOM.nextInt(1000));    // 随机休眠 0-1000ms
  29.             }
  30.         } catch (InterruptedException e) {
  31.             // 线程中断时的异常处理(当前空实现)
  32.         }
  33.     }
  34. }
复制代码
WorkerThread.java

职责:从传输带上获取部件或货物
  1. import java.util.Random;
  2. /**
  3. * 工作线程类,负责从共享通道中持续获取任务并执行
  4. */
  5. public class WorkerThread extends Thread {
  6.     /**
  7.      * 存储任务通道实例,用于获取待处理的任务对象
  8.      */
  9.     private final SimpleChannel simpleChannel;
  10.     /**
  11.      * 静态随机数生成器,用于生成随机休眠时间(模拟任务执行间隔)
  12.      */
  13.     private static final Random random = new Random(System.currentTimeMillis());
  14.     /**
  15.      * 构造方法初始化线程名称和任务通道
  16.      * @param name 线程名称标识
  17.      * @param simpleChannel 任务通道对象,用于获取待处理任务
  18.      */
  19.     public WorkerThread(String name, SimpleChannel simpleChannel) {
  20.         super(name);
  21.         this.simpleChannel = simpleChannel;
  22.     }
  23.     /**
  24.      * 线程执行主体方法,持续循环执行以下操作:
  25.      * 1. 从通道中获取任务对象
  26.      * 2. 执行任务
  27.      * 3. 随机休眠(模拟任务处理间隔)
  28.      */
  29.     @Override
  30.     public void run() {
  31.         while (true) {
  32.             try {
  33.                 // 从通道中获取任务对象
  34.                 Cargo cargo = simpleChannel.popCargo();
  35.                 // 执行任务逻辑
  36.                 cargo.execute();
  37.                 // 随机休眠0-1000毫秒(模拟任务间隔)
  38.                 Thread.sleep(random.nextInt(1000));
  39.             } catch (Exception e) {
  40.                 // 异常处理:打印堆栈信息(实际应用中建议添加更完善的异常处理)
  41.                 e.printStackTrace();
  42.             }
  43.         }
  44.     }
  45. }
复制代码
执行的效果
  1. worker-1 executed Cargo ==> NO.0 Name.alen
  2. worker-0 executed Cargo ==> NO.0 Name.june
  3. worker-4 executed Cargo ==> NO.0 Name.july
  4. worker-3 executed Cargo ==> NO.1 Name.june
  5. worker-0 executed Cargo ==> NO.1 Name.alen
  6. worker-2 executed Cargo ==> NO.1 Name.july
  7. worker-1 executed Cargo ==> NO.2 Name.june
  8. worker-0 executed Cargo ==> NO.2 Name.july
  9. worker-2 executed Cargo ==> NO.2 Name.alen
  10. worker-3 executed Cargo ==> NO.3 Name.june
  11. worker-4 executed Cargo ==> NO.4 Name.june
  12. worker-2 executed Cargo ==> NO.3 Name.alen
  13. worker-3 executed Cargo ==> NO.3 Name.july
  14. worker-4 executed Cargo ==> NO.5 Name.june
  15. worker-1 executed Cargo ==> NO.6 Name.june
  16. worker-0 executed Cargo ==> NO.4 Name.alen
  17. worker-2 executed Cargo ==> NO.7 Name.june
  18. worker-3 executed Cargo ==> NO.5 Name.alen
  19. worker-2 executed Cargo ==> NO.4 Name.july
  20. worker-4 executed Cargo ==> NO.6 Name.alen
  21. worker-0 executed Cargo ==> NO.5 Name.july
  22. worker-2 executed Cargo ==> NO.8 Name.june
  23. worker-3 executed Cargo ==> NO.7 Name.alen
  24. worker-1 executed Cargo ==> NO.6 Name.july
  25. worker-4 executed Cargo ==> NO.8 Name.alen
  26. worker-4 executed Cargo ==> NO.9 Name.june
  27. worker-2 executed Cargo ==> NO.10 Name.june
  28. worker-3 executed Cargo ==> NO.9 Name.alen
  29. worker-1 executed Cargo ==> NO.7 Name.july
  30. worker-2 executed Cargo ==> NO.10 Name.alen
  31. worker-4 executed Cargo ==> NO.11 Name.alen
  32. worker-3 executed Cargo ==> NO.11 Name.june
  33. worker-0 executed Cargo ==> NO.8 Name.july
  34. worker-4 executed Cargo ==> NO.12 Name.alen
  35. worker-3 executed Cargo ==> NO.12 Name.june
  36. worker-4 executed Cargo ==> NO.13 Name.alen
  37. worker-1 executed Cargo ==> NO.9 Name.july
  38. worker-2 executed Cargo ==> NO.14 Name.alen
  39. worker-0 executed Cargo ==> NO.13 Name.june
  40. worker-4 executed Cargo ==> NO.15 Name.alen
  41. worker-2 executed Cargo ==> NO.10 Name.july
  42. worker-1 executed Cargo ==> NO.14 Name.june
  43. worker-3 executed Cargo ==> NO.15 Name.june
  44. worker-4 executed Cargo ==> NO.16 Name.alen
  45. worker-3 executed Cargo ==> NO.11 Name.july
  46. worker-2 executed Cargo ==> NO.12 Name.july
  47. worker-0 executed Cargo ==> NO.16 Name.june
  48. worker-1 executed Cargo ==> NO.17 Name.june
  49. worker-2 executed Cargo ==> NO.18 Name.june
  50. worker-2 executed Cargo ==> NO.17 Name.alen
  51. worker-4 executed Cargo ==> NO.13 Name.july
  52. worker-0 executed Cargo ==> NO.18 Name.alen
  53. worker-0 executed Cargo ==> NO.19 Name.june
  54. worker-3 executed Cargo ==> NO.14 Name.july
  55. worker-1 executed Cargo ==> NO.19 Name.alen
  56. worker-2 executed Cargo ==> NO.20 Name.june
  57. worker-3 executed Cargo ==> NO.21 Name.june
  58. worker-0 executed Cargo ==> NO.15 Name.july
  59. worker-3 executed Cargo ==> NO.20 Name.alen
  60. worker-4 executed Cargo ==> NO.21 Name.alen
  61. worker-3 executed Cargo ==> NO.22 Name.june
  62. worker-1 executed Cargo ==> NO.16 Name.july
  63. worker-2 executed Cargo ==> NO.17 Name.july
  64. worker-0 executed Cargo ==> NO.23 Name.june
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

饭宝

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表