饭宝 发表于 2025-4-18 13:20:14

java 多线程之Worker Thread模式(Thread Pool模式)

Worker Thread模式

Worker的意思是工作的人,在Worker Thread模式中,工人线程Worker thread会逐个取回工作并进行处置惩罚,当所有工作全部完成后,工人线程会期待新的工作到来。
Worker Thread模式也被成为Background Thread(配景线程)模式,另外,如果从生存多个工人线程的场合这一点看,我们也可以称这种模式为Thread Pool模式。
Worker Thread模式中的脚色

TransPortThread [将部件塞到传送带上](委托者)
创建表现工作请求的Cargo并将其通报给SimpleChannel 。在示例步调中,TransPortThread 相称于该脚色。
2.SimpleChannel [传送带的脚色](通讯线路)
SimpleChannel 脚色接受来自于TransPortThread 的Cargo,并将其通报给WorkerThread。在示例步调中,SimpleChannel 相称于该脚色。
3.WorkerThread(工人)
WorkerThread脚色从Channel中获取Cargo,并进行工作,当一项工作完成后,它会继承去获取另外的Cargo,在示例步调中,WorkerThread相称于该脚色。
4.Cargo(部件)
Cargo脚色是表现工作中的部件,Cargo脚色中生存了进行工作所必须的信息,在示例步调中,Cargo相称于该部件脚色。
Worker Thread利用场景

想象这样一个场景,一个工厂在生产货物,在一个车间里,有几个工人,每次生产部件准备后,车间外的人就将部件放到车间的传送带上,工人每次做完一个货品就从桌子上取部件。在这里,注意到,部件并不是直接交给工人的,另外一点,工人并不是做完一个部件就回家换个新人,后者在现实有点滑稽,但是在步调中却对应一个典型的线程利用方法:线程池。
所谓线程池,就是对线程的复用,当线程执行完任务之后就继承取其他任务执行,而不是销毁启动新线程执行其他任务。因为线程的启动对于系统性能开销比较大,所以这样对于系统性能的进步很有好处。
Main.java

public class WorkClient {

    public static void main(String[] args) {
      // 创建容量为5的SimpleChannel实例(数据缓冲通道)
      SimpleChannel simpleChannel = new SimpleChannel(5);

      // 启动工作通道开始处理数据(消费者启动)
      simpleChannel.startWork();

      // 启动三个数据传输线程(生产者角色)
      new TransPortThread("june", simpleChannel).start();
      new TransPortThread("july", simpleChannel).start();
      new TransPortThread("alen", simpleChannel).start();
    }

}
Cargo.java

脚色:部件或货物
import java.util.Optional;

/**
* 货物类,包含货物编号和名称,并提供执行操作时打印线程执行信息的功能
*
* @author [请填写作者名称]
* @version 1.0
*/
public class Cargo {

    /**
   * 货物名称
   */
    private final String name;

    /**
   * 货物编号
   */
    private final int num;

    /**
   * 构造方法,初始化货物信息
   *
   * @param name 货物名称
   * @param num货物编号
   */
    public Cargo(String name, int num) {
      this.name = name;
      this.num = num;
    }

    /**
   * 执行货物处理操作,打印当前线程名称和货物信息
   */
    public void execute(){
      Optional.of(Thread.currentThread().getName()+" executed "+this.toString()).ifPresent(System.out::println);
    }

    /**
   * 获取货物信息的字符串表示
   *
   * @return 格式化后的货物信息(例如:Cargo ==> NO.123 Name.ItemA)
   */
    @Override
    public String toString() {
      return "Cargo ==> NO."+num+" Name."+name;
    }
}
SimpleChannel.java

职责:将部件或货物从源头传给处置惩罚端
import java.util.Arrays;

/**
* 简单线程通信通道,管理货物队列和工作线程池
* 使用环形缓冲区实现生产者-消费者模式,通过synchronized和wait/notify机制保证线程安全
*/
public class SimpleChannel {

    /** 缓冲区最大容量 */
    private static final int BUFFER_SIZE = 100;

    /** 环形缓冲区数组,存储货物对象 */
    private final Cargo[] cargoQueue;

    /** 队列头指针,指向下一个要取出的元素位置 */
    private int headIndex;

    /** 当前队列中的元素数量 */
    private int count;

    /** 队列尾指针,指向下一个要插入的元素位置 */
    private int tailIndex;

    /** 工作线程池 */
    private final WorkerThread[] workerPool;

    /**
   * 构造方法初始化通道
   * @param workerCount 工作线程数量
   */
    public SimpleChannel(int workerCount) {
      this.cargoQueue = new Cargo;
      this.headIndex = 0;
      this.count = 0;
      this.tailIndex = 0;
      this.workerPool = new WorkerThread;
      this.init();
    }

    /**
   * 初始化工作线程池
   * 为每个线程设置名称并绑定当前通道实例
   */
    private void init() {
      for (int i = 0; i < workerPool.length; i++) {
            workerPool = new WorkerThread("worker-"+i, this);
      }
    }

    /**
   * 启动所有工作线程开始工作
   * 通过遍历线程池调用start()方法启动每个线程
   */
    public void startWork() {
      Arrays.asList(workerPool).forEach(WorkerThread::start);
    }

    /**
   * 生产者添加货物到缓冲区
   * @param cargo 要添加的货物对象
   * @throws InterruptedException 线程中断异常
   */
    public synchronized void pushCargo(Cargo cargo) {
      while (count >= BUFFER_SIZE) {
            try {
                this.wait(); // 缓冲区满时等待
            } catch (InterruptedException e) {
                // 异常处理(建议补充日志)
            }
      }
      // 添加货物到尾部并更新指针
      this.cargoQueue = cargo;
      this.tailIndex = (this.tailIndex + 1) % this.cargoQueue.length;
      this.count++;
      this.notifyAll(); // 通知等待线程
    }

    /**
   * 消费者从缓冲区取出货物
   * @return 取出的货物对象
   * @throws InterruptedException 线程中断异常
   */
    public synchronized Cargo popCargo() {
      while (count <= 0) {
            try {
                this.wait(); // 缓冲区空时等待
            } catch (InterruptedException e) {
                // 异常处理(建议补充日志)
            }
      }
      // 取出头部元素并更新指针
      Cargo cargo = this.cargoQueue;
      this.headIndex = (this.headIndex + 1) % this.cargoQueue.length;
      this.count--;
      this.notifyAll(); // 通知等待线程
      return cargo;
    }
}
TransPortThread .java

职责:将请求的信息塞到传输通道上去
/**
* 运输线程,负责持续生成货物并推送到传输通道 {@link SimpleChannel}
*/
public class TransPortThread extends Thread {

    private SimpleChannel simpleChannel;

    /**
   * 静态随机数生成器,用于生成随机休眠时间
   */
    private static final Random RANDOM = new Random(System.currentTimeMillis());

    /**
   * 构造方法
   *
   * @param name 线程名称
   * @param simpleChannel 货物传输通道
   */
    public TransPortThread(String name, SimpleChannel simpleChannel) {
      super(name);
      this.simpleChannel = simpleChannel;
    }

    @Override
    public void run() {
      try {
            int i = 0;
            // 无限循环持续生成货物
            while (true){
                Cargo cargo = new Cargo(getName(), i++); // 创建货物对象(包含线程名和序号)
                this.simpleChannel.pushCargo(cargo);   // 将货物推送到通道
                Thread.sleep(RANDOM.nextInt(1000));    // 随机休眠 0-1000ms
            }
      } catch (InterruptedException e) {
            // 线程中断时的异常处理(当前空实现)
      }
    }
}
WorkerThread.java

职责:从传输带上获取部件或货物
import java.util.Random;
/**
* 工作线程类,负责从共享通道中持续获取任务并执行
*/
public class WorkerThread extends Thread {

    /**
   * 存储任务通道实例,用于获取待处理的任务对象
   */
    private final SimpleChannel simpleChannel;

    /**
   * 静态随机数生成器,用于生成随机休眠时间(模拟任务执行间隔)
   */
    private static final Random random = new Random(System.currentTimeMillis());

    /**
   * 构造方法初始化线程名称和任务通道
   * @param name 线程名称标识
   * @param simpleChannel 任务通道对象,用于获取待处理任务
   */
    public WorkerThread(String name, SimpleChannel simpleChannel) {
      super(name);
      this.simpleChannel = simpleChannel;
    }

    /**
   * 线程执行主体方法,持续循环执行以下操作:
   * 1. 从通道中获取任务对象
   * 2. 执行任务
   * 3. 随机休眠(模拟任务处理间隔)
   */
    @Override
    public void run() {
      while (true) {
            try {
                // 从通道中获取任务对象
                Cargo cargo = simpleChannel.popCargo();
                // 执行任务逻辑
                cargo.execute();
                // 随机休眠0-1000毫秒(模拟任务间隔)
                Thread.sleep(random.nextInt(1000));
            } catch (Exception e) {
                // 异常处理:打印堆栈信息(实际应用中建议添加更完善的异常处理)
                e.printStackTrace();
            }
      }
    }
}
执行的效果
worker-1 executed Cargo ==> NO.0 Name.alen
worker-0 executed Cargo ==> NO.0 Name.june
worker-4 executed Cargo ==> NO.0 Name.july
worker-3 executed Cargo ==> NO.1 Name.june
worker-0 executed Cargo ==> NO.1 Name.alen
worker-2 executed Cargo ==> NO.1 Name.july
worker-1 executed Cargo ==> NO.2 Name.june
worker-0 executed Cargo ==> NO.2 Name.july
worker-2 executed Cargo ==> NO.2 Name.alen
worker-3 executed Cargo ==> NO.3 Name.june
worker-4 executed Cargo ==> NO.4 Name.june
worker-2 executed Cargo ==> NO.3 Name.alen
worker-3 executed Cargo ==> NO.3 Name.july
worker-4 executed Cargo ==> NO.5 Name.june
worker-1 executed Cargo ==> NO.6 Name.june
worker-0 executed Cargo ==> NO.4 Name.alen
worker-2 executed Cargo ==> NO.7 Name.june
worker-3 executed Cargo ==> NO.5 Name.alen
worker-2 executed Cargo ==> NO.4 Name.july
worker-4 executed Cargo ==> NO.6 Name.alen
worker-0 executed Cargo ==> NO.5 Name.july
worker-2 executed Cargo ==> NO.8 Name.june
worker-3 executed Cargo ==> NO.7 Name.alen
worker-1 executed Cargo ==> NO.6 Name.july
worker-4 executed Cargo ==> NO.8 Name.alen
worker-4 executed Cargo ==> NO.9 Name.june
worker-2 executed Cargo ==> NO.10 Name.june
worker-3 executed Cargo ==> NO.9 Name.alen
worker-1 executed Cargo ==> NO.7 Name.july
worker-2 executed Cargo ==> NO.10 Name.alen
worker-4 executed Cargo ==> NO.11 Name.alen
worker-3 executed Cargo ==> NO.11 Name.june
worker-0 executed Cargo ==> NO.8 Name.july
worker-4 executed Cargo ==> NO.12 Name.alen
worker-3 executed Cargo ==> NO.12 Name.june
worker-4 executed Cargo ==> NO.13 Name.alen
worker-1 executed Cargo ==> NO.9 Name.july
worker-2 executed Cargo ==> NO.14 Name.alen
worker-0 executed Cargo ==> NO.13 Name.june
worker-4 executed Cargo ==> NO.15 Name.alen
worker-2 executed Cargo ==> NO.10 Name.july
worker-1 executed Cargo ==> NO.14 Name.june
worker-3 executed Cargo ==> NO.15 Name.june
worker-4 executed Cargo ==> NO.16 Name.alen
worker-3 executed Cargo ==> NO.11 Name.july
worker-2 executed Cargo ==> NO.12 Name.july
worker-0 executed Cargo ==> NO.16 Name.june
worker-1 executed Cargo ==> NO.17 Name.june
worker-2 executed Cargo ==> NO.18 Name.june
worker-2 executed Cargo ==> NO.17 Name.alen
worker-4 executed Cargo ==> NO.13 Name.july
worker-0 executed Cargo ==> NO.18 Name.alen
worker-0 executed Cargo ==> NO.19 Name.june
worker-3 executed Cargo ==> NO.14 Name.july
worker-1 executed Cargo ==> NO.19 Name.alen
worker-2 executed Cargo ==> NO.20 Name.june
worker-3 executed Cargo ==> NO.21 Name.june
worker-0 executed Cargo ==> NO.15 Name.july
worker-3 executed Cargo ==> NO.20 Name.alen
worker-4 executed Cargo ==> NO.21 Name.alen
worker-3 executed Cargo ==> NO.22 Name.june
worker-1 executed Cargo ==> NO.16 Name.july
worker-2 executed Cargo ==> NO.17 Name.july
worker-0 executed Cargo ==> NO.23 Name.june

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: java 多线程之Worker Thread模式(Thread Pool模式)