上节回顾
在上一节当中,已经实现了一个线程池,在本节当中,我们需要添加拒绝策略。这里使用到了策略模式的设计模式,由于拒绝策略是多种的,我们需要将这个权利下放给调用者(由调用者来指定我要采取哪种策略),而线程池只需要调用拒绝的接口即可。
步骤
(1)定义拒绝策略接口
(2)在线程池中加入拒绝策略参数
(3)自行调用测试
1.定义接口类
- @FunctionalInterface
- interface RejectPolicy<T>{
- //注意传递参数
- void reject(BlockQueue<T> queue,Runnable task);
- }
复制代码 2.线程池中添加接口以及调用方法
- @Slf4j
- class ThreadPool {
- //任务队列
- private BlockQueue<Runnable> taskQueue;
- //线程集合 我们需要对线程做一个包装
- private HashSet<Worker> workers = new HashSet<>();
- //核心线程数量
- private long coreSize;
- //超时时间
- private long timeout;
- //时间单位
- private TimeUnit timeUnit;
- //自定义拒绝策略
- private RejectPolicy<Runnable> rejectPolicy;
- public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit){
- this.taskQueue = new BlockQueue<>(queueCapacity);
- this.coreSize = coreSize;
- this.timeout = timeout;
- this.timeUnit = timeUnit;
- this.rejectPolicy = (queue, task) -> {
- throw new RuntimeException();
- };
- }
- public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy){
- taskQueue = new BlockQueue<>(queueCapacity);
- this.coreSize = coreSize;
- this.timeout = timeout;
- this.timeUnit = timeUnit;
- this.rejectPolicy = rejectPolicy;
- }
- //执行任务
- public void execute(Runnable task){
- //当任务数量尚未超过coreSize
- synchronized (workers){
- if (workers.size() < coreSize){
- log.info("创建工作线程{}",task);
- Worker worker = new Worker(task);
- workers.add(worker);
- worker.start();
- }else{
- log.info("加入到任务队列{}",task);
- //有可能会阻塞在这里 进而将主线程阻塞掉
- //taskQueue.put(task);
- //这里会有很多种策略自定义策略
- //策略模式:操作抽象成接口实现代码是传过来不会写死
- taskQueue.tryPut(rejectPolicy,task);
- //rejectPolicy.reject(taskQueue,task);
- }
- }
- }
- class Worker extends Thread{
- private Runnable task;
- public Worker(Runnable task){
- this.task = task;
- }
- @Override
- public void run() {
- while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
- try {
- log.info("正在执行...{}",task);
- //执行任务
- task.run();
- }catch (Exception e){
- System.out.println(e.getMessage());
- }finally {
- //不要忘记这一步
- task = null;
- }
- }
- synchronized (workers){
- log.info("worker被移除{}",this);
- workers.remove(this);
- }
- }
- }
- }
复制代码 3.main测试
- @Slf4j
- public class TestPool {
- //阻塞队列是平衡生产者和消费者之间的中介
- //任务数量超过任务队列的情况
- public static void main(String[] args) {
- ThreadPool threadPool = new ThreadPool(10, 2, 1000, TimeUnit.MICROSECONDS, (queue, task) -> {
- //1.死等
- queue.put(task);
- //2.超时等待
- queue.offer(task, 1500, TimeUnit.MICROSECONDS);
- //3.调用者自己放弃
- // log.debug("放弃{}",task);
- //4.调用者抛异常
- //throw new RuntimeException("task执行失败" + task);
- //5.调用者自己执行
- task.run();
- });
- for (int i = 0; i < 20; i++) {
- int j = i;
- //主线程可能会在这里阻塞
- threadPool.execute(() -> {
- try {
- Thread.sleep(30000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- TestPool.log.debug("{}", j);
- });
- }
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |