流水线并发框架架构

打印 上一主题 下一主题

主题 1033|帖子 1033|积分 3099

V1

版本1:此中定义了一个 Task 类和三个继续自 Thread 类的子类 TA、TB 和 TC。


  • Task 类:

    • num 是一个整数变量,用于存储任务的结果。
    • taskA()、taskB() 和 taskC() 是三个任务方法,分别模仿了一些计算和等候的操作。

  • TA、TB、TC 类:

    • 这三个类分别表示三个差别的线程,每个线程实行一组任务。
    • 每个线程吸收一个 ArrayList<Task> 类型的列表作为参数,在 run() 方法中,通过迭代列表,对每个 Task 对象调用相应的任务方法。

  • Main 类:

    • 在 main 方法中,首先创建了一个包罗50个 Task 对象的列表 list。
    • 创建了 TA、TB 和 TC 的实例,并将 list 作为参数通报给它们。
    • 启动了三个线程,分别实行 taskA、taskB 和 taskC。
    • 创建了一个额外的线程,该线程每秒输出部分任务的结果,以便在实行过程中观察任务的完成环境。
    • 使用 join() 方法等候三个线程实行完成。

  • 任务实行过程:

    • TA 线程每次迭代调用 taskA(),导致 num 值增加 10。
    • TB 线程每次迭代调用 taskB(),导致 num 值乘以 20。
    • TC 线程每次迭代调用 taskC(),导致 num 值乘以自身。

  • 输出:

    • 在额外的线程中,每秒输出 list 中每个第五个任务的结果。

须要注意的是,由于线程之间并发实行,输出结果可能会交错。此外,对 num 的操作可能导致竞态条件,可能须要使用同步机制来确保线程安全性。
  1. import java.util.ArrayList;
  2. // A-B-C : 40000 正确完成任务后的结果
  3. public class Task {
  4.     int num;
  5.     public void taskA() {
  6.         try {
  7.             Thread.sleep(50);
  8.         } catch (InterruptedException e) {
  9.             throw new RuntimeException(e);
  10.         }
  11.         num += 10;
  12.     }
  13.     public void taskB() {
  14.         try {
  15.             Thread.sleep(500);
  16.         } catch (InterruptedException e) {
  17.             throw new RuntimeException(e);
  18.         }
  19.         num *= 20;
  20.     }
  21.     public void taskC() {
  22.         try {
  23.             Thread.sleep(650);
  24.         } catch (InterruptedException e) {
  25.             throw new RuntimeException(e);
  26.         }
  27.         num *= num;
  28.     }
  29. }
  30. class TA extends Thread {
  31.     ArrayList<Task> list;
  32.     public TA(ArrayList<Task> list) {
  33.         this.list=list;
  34.     }
  35.     @Override
  36.     public void run() {
  37.         for(int i=0;i<list.size();i++){
  38.             Task task = list.get(i);
  39.             task.taskA();
  40.         }
  41.     }
  42. }
  43. class TB extends Thread {
  44.     ArrayList<Task> list;
  45.     public TB(ArrayList<Task> list) {
  46.         this.list=list;
  47.     }
  48.     @Override
  49.     public void run() {
  50.         for(int i=0;i<list.size();i++){
  51.             Task task = list.get(i);
  52.             task.taskB();
  53.         }
  54.     }
  55. }
  56. class TC extends Thread {
  57.     ArrayList<Task> list;
  58.     public TC(ArrayList<Task> list) {
  59.         this.list=list;
  60.     }
  61.     @Override
  62.     public void run() {
  63.         for(int i=0;i<list.size();i++){
  64.             Task task = list.get(i);
  65.             task.taskC();
  66.         }
  67.     }
  68. }
  69. class Main {
  70.     public static void main(String[] args) {
  71.         // 1: 定量任务
  72.         ArrayList<Task> list = new ArrayList<>();
  73.         for (int i = 0; i < 50; i++) {
  74.             list.add(new Task());
  75.         }
  76.         TA ta = new TA(list);
  77.         TB tb = new TB(list);
  78.         TC tc = new TC(list);
  79.         ta.start();
  80.         tb.start();
  81.         tc.start();
  82.         //监听状态线程
  83.         new Thread(){
  84.             public void run(){
  85.                 while(true){
  86.                     try {
  87.                         Thread.sleep(1000);
  88.                     } catch (InterruptedException e) {
  89.                         throw new RuntimeException(e);
  90.                     }
  91.                     int size=0;
  92.                     for(int i=0;i< list.size();i++){
  93.                         if(i%5==0){
  94.                             System.out.println(list.get(i).num);
  95.                         }
  96.                     }
  97.                     //System.out.println(list.size());
  98.                     //System.out.println("已完成"+size+"任务");
  99.                 }
  100.             }
  101.         }.start();
  102.         try {
  103.             ta.join();
  104.         } catch (InterruptedException e) {
  105.             throw new RuntimeException(e);
  106.         }
  107.         try {
  108.             tb.join();
  109.         } catch (InterruptedException e) {
  110.             throw new RuntimeException(e);
  111.         }
  112.         try {
  113.             tc.join();
  114.         } catch (InterruptedException e) {
  115.             throw new RuntimeException(e);
  116.         }
  117.     }
  118. }
复制代码
可见性题目

可见性题目是多线程并发编程中常见的一个挑战,它涉及到一个线程对共享变量的修改是否可以或许立即被其他线程看到。在没有适当同步机制的环境下,由于每个线程有自己的工作内存,可能会导致一个线程对变量的修改对其他线程不可见。
在 Java 中,主要有两个因素导致可见性题目:


  • 线程工作内存: 每个线程都有自己的工作内存,用于存储主内存中的变量的副本。线程在实行时,操作的是工作内存中的变量,而不是直接操作主内存中的变量。
  • 指令重排序: 编译器和处理器为了进步实行效率,可能会对指令进行重排序。这就可能导致一些操作的实行顺序与代码中的顺序差别等。
为了办理可见性题目,Java 提供了 volatile 关键字。使用 volatile 关键字修饰的变量具有以下特性:


  • 可见性: 当一个线程修改了 volatile 变量的值,该变量的新值会立即被写回主内存,而其他线程在读取该变量时会直接从主内存中获取最新的值。
  • 克制指令重排序: volatile 关键字克制指令重排序,确保了变量的修改按照代码的顺序实行。

例子中演示了使用 volatile 关键字办理多线程可见性题目的环境。


  • volatile 关键字:

    • volatile 修饰的变量 flag 表示该变量是易变的,并且任何线程对它的修改都会立即反映到其他线程中。这办理了多线程之间的可见性题目,确保一个线程对该变量的修改对其他线程是可见的。

  • main 方法:

    • 创建了一个名为 t1 的线程,该线程在运行时将 flag 设置为 true。
    • 创建了一个匿名线程,该线程在运行时通过循环检查 flag 的值,一直比及 flag 变为 true 才输出 "T2-end"。
    • 在 main 方法中,通过 Thread.sleep(2000) 使得主线程休眠 2 秒,以确保 t1 线程有足够的时间来设置 flag 的值。
    • 启动了 t1 线程。

  • 输出:

    • T1 - start: t1 线程开始实行,将 flag 设置为 true。
    • T2-start: 另一个线程开始实行,进入循环等候,由于 flag 初始值为 false,因此一直等候。
    • T1 - end: t1 线程设置 flag 为 true。
    • T2-end: 循环竣事,输出 "T2-end"。

  • 关于注释的分析:

    • 在代码中有一行注释 // System.out.print("");,这是一种办理可见性题目的“空循环”技巧。通过参加一个空的循环,可以迫使线程重新读取共享变量,从而办理可见性题目。在这个例子中,由于使用了 volatile 关键字,这行注释实际上不再须要,由于 volatile 自己保证了可见性。

总的来说,这个例子展示了使用 volatile 关键字办理多线程可见性题目的环境,确保一个线程对共享变量的修改可以或许及时被其他线程感知。
  1. public class Test {
  2.     volatile static boolean flag;// 每次使用变量时 都会重新拷贝一份副本过来
  3.     public static void main(String[] args) {
  4.         Thread t1 = new Thread() {
  5.             @Override
  6.             public void run() {
  7.                 System.out.println("T1 - start");
  8.                 flag = true;
  9.                 System.out.println("T1 - end");
  10.             }
  11.         };
  12.         new Thread() {
  13.             @Override
  14.             public void run() {
  15.                 System.out.println("T2-start");
  16.                 while (!flag) {
  17.                  // System.out.print("");//可见性问题
  18.                 }
  19.                 System.out.println("T2-end");
  20.             }
  21.         }.start();
  22.         try {
  23.             Thread.sleep(2000);
  24.         } catch (InterruptedException e) {
  25.             throw new RuntimeException(e);
  26.         }
  27.         t1.start();
  28.     }
  29. }
复制代码
V2

版本2:展示了一个多线程任务的场景,此中包罗了三个任务(taskA、taskB、taskC),每个任务依次实行,且必须按照一定的顺序。在 Main 类的 main 方法中,创建了一个包罗 100 个 Task 对象的列表,并启动了三个线程 TA、TB、TC 分别实行这些任务。另外,另有一个额外的线程用于监听任务完成状态。
让我们详细讲解这段代码:


  • Task 类:

    • 每个 Task 对象包罗一个整数变量 num 和三个布尔标志 flagA、flagB、flagC。
    • taskA、taskB、taskC 方法模仿了三个任务的实行过程,每个任务在实行前会先检查前置任务是否完成(通过相应的标志)。
    • isEnd 方法用于判定任务是否全部完成。

  • TA、TB、TC 类:

    • 这三个类分别表示实行任务 A、B、C 的线程,它们负责迭代任务列表,实行相应的任务方法。
    • 每个线程在实行任务的过程中,通过检查任务的标志来确保按照顺序实行。

  • Main 类:

    • 创建了一个包罗 100 个 Task 对象的列表 list。
    • 创建了 TA、TB 和 TC 的实例,并将 list 作为参数通报给它们。
    • 启动了三个线程,分别实行 taskA、taskB 和 taskC。
    • 创建了一个额外的线程,该线程每秒输出已完成的任务数量和任务列表的总巨细。
    • 使用 join 方法等候三个任务线程实行完成。

  • 输出:

    • 每个任务线程在实行时,会输出实行的次数和已完成的任务数量。
    • 额外的监听线程每秒输出已完成的任务数量和任务列表的总巨细。

  • 任务实行过程:

    • 每个任务在实行前都会检查前置任务是否完成,确保按照任务顺序实行。
    • 每个任务实行完成后,会设置相应的标志,表示该任务已完成。
    • 额外的监听线程会每秒输出已完成的任务数量和任务列表的总巨细,直到所有任务完成。

该代码通过使用标志和多线程的方式模仿了任务的顺序实行,确保了每个任务在满足条件时才实行。须要注意的是,在实际生产环境中,更复杂的同步机制可能会更实用。
  1. import java.util.ArrayList;
  2. public class Task {
  3.     int num;
  4.     boolean flagA = false;
  5.     boolean flagB = false;
  6.     boolean flagC = false;
  7.     public void taskA() {
  8.         if (!flagA) {
  9.             sleep(40);
  10.             num += 10;
  11.             flagA = true;
  12.         }
  13.     }
  14.     public void taskB() {
  15.         if (!flagB && flagA) {
  16.             sleep(30);
  17.             num *= 20;
  18.             flagB = true;
  19.         }
  20.     }
  21.     public void taskC() {
  22.         if (flagB && !flagC) {
  23.             sleep(30);
  24.             num *= num;
  25.             flagC = true;
  26.         }
  27.     }
  28.     public boolean isEnd() {
  29.         return flagC;
  30.     }
  31.     public void sleep(long time) {
  32.         try {
  33.             Thread.sleep(time);
  34.         } catch (InterruptedException e) {
  35.             throw new RuntimeException(e);
  36.         }
  37.     }
  38. }
  39. class TA extends Thread {
  40.     ArrayList<Task> list;
  41.     public TA(ArrayList<Task> list) {
  42.         this.list = list;
  43.     }
  44.     @Override
  45.     public void run() {
  46.         int count = 0;
  47.         while (true) {
  48.             try {
  49.                 Thread.sleep(2000);
  50.             } catch (InterruptedException e) {
  51.                 throw new RuntimeException(e);
  52.             }
  53.             System.out.println("TA - run" + count++);
  54.             int size = 0;
  55.             for (int i = 0; i < list.size(); i++) {
  56.                 Task task = list.get(i);
  57.                 task.taskA();
  58.                 if (task.flagA) {
  59.                     size++;
  60.                 }
  61.             }
  62.             System.out.println("TA - 完成了" + size);
  63.             if (size == list.size()) {
  64.                 break;
  65.             }
  66.         }
  67.     }
  68. }
  69. class TB extends Thread {
  70.     ArrayList<Task> list;
  71.     public TB(ArrayList<Task> list) {
  72.         this.list = list;
  73.     }
  74.     @Override
  75.     public void run() {
  76.         int count = 0;
  77.         while (true) {
  78.             try {
  79.                 Thread.sleep(2000);
  80.             } catch (InterruptedException e) {
  81.                 throw new RuntimeException(e);
  82.             }
  83.             System.out.println("TB - run" + count++);
  84.             int size = 0;
  85.             for (int i = 0; i < list.size(); i++) {
  86.                 Task task = list.get(i);
  87.                 task.taskB();
  88.                 if (task.flagB) {
  89.                     size++;
  90.                 }
  91.             }
  92.             System.out.println("TB - 完成了" + size);
  93.             if (size == list.size()) {
  94.                 break;
  95.             }
  96.         }
  97.     }
  98. }
  99. class TC extends Thread {
  100.     ArrayList<Task> list;
  101.     public TC(ArrayList<Task> list) {
  102.         this.list = list;
  103.     }
  104.     @Override
  105.     public void run() {
  106.         int count = 0;
  107.         while (true) {
  108.             try {
  109.                 Thread.sleep(2000);
  110.             } catch (InterruptedException e) {
  111.                 throw new RuntimeException(e);
  112.             }
  113.             System.out.println("TC - run" + count++);
  114.             int size = 0;
  115.             for (int i = 0; i < list.size(); i++) {
  116.                 Task task = list.get(i);
  117.                 task.taskC();
  118.                 if (task.flagC) {
  119.                     size++;
  120.                 }
  121.             }
  122.             System.out.println("TC - 完成了" + size);
  123.             if (size == list.size()) {
  124.                 break;
  125.             }
  126.         }
  127.     }
  128. }
  129. class Main {
  130.     public static void main(String[] args) {
  131. // 1: 定量任务
  132.         ArrayList<Task> list = new ArrayList<>();
  133.         for (int i = 0; i < 100; i++) {
  134.             list.add(new Task());
  135.         }
  136.         TA ta = new TA(list);
  137.         TB tb = new TB(list);
  138.         TC tc = new TC(list);
  139.         ta.start();
  140.         tb.start();
  141.         tc.start();
  142. // 监听状态线程
  143.         new Thread() {
  144.             @Override
  145.             public void run() {
  146.                 while (true) {
  147.                     try {
  148.                         Thread.sleep(1000);
  149.                     } catch (InterruptedException e) {
  150.                         throw new RuntimeException(e);
  151.                     }
  152.                     int size = 0;
  153.                     for (int i = 0; i < list.size(); i++) {
  154.                         if (list.get(i).num == 40000) {
  155.                             size++;
  156.                         }
  157.                     }
  158.                     System.out.println(list.size());
  159.                     System.out.println("已完成:" + size + "任务");
  160.                 }
  161.             }
  162.         }.start();
  163.         try {
  164.             ta.join();
  165.         } catch (InterruptedException e) {
  166.             throw new RuntimeException(e);
  167.         }
  168.         try {
  169.             tb.join();
  170.         } catch (InterruptedException e) {
  171.             throw new RuntimeException(e);
  172.         }
  173.         try {
  174.             tc.join();
  175.         } catch (InterruptedException e) {
  176.             throw new RuntimeException(e);
  177.         }
  178.         // for (int i = 0; i < list.size(); i++) {
  179.         // System.out.println(list.get(i).num);
  180.         // }
  181.     }
  182. }
复制代码
V3

版本3:代码进一步改进了多线程任务的实现,引入了 volatile 关键字来确保线程之间的可见性。同时,通过在任务的 while 循环中进行阻塞等候,实现了任务的顺序实行。


  • Task 类:

    • volatile 关键字被添加到 flagA、flagB、flagC 上,确保了对这些标志的修改立即可见于其他线程。
    • 任务方法 taskA、taskB、taskC 中的 while 循环用于在前置任务完成前阻塞当前任务的实行,确保按照任务顺序实行。

  • TA、TB、TC 类:

    • 这三个类分别表示实行任务 A、B、C 的线程,它们负责迭代任务列表,实行相应的任务方法。
    • 在任务 B 和任务 C 的实行中,通过 while 循环进行阻塞等候前置任务完成。

  • Main 类:

    • 创建了一个包罗 500 个 Task 对象的列表 list。
    • 创建了 TA、TB 和 TC 的实例,并将 list 作为参数通报给它们。
    • 启动了三个线程,分别实行 taskA、taskB 和 taskC。
    • 创建了一个额外的线程,该线程每秒输出已完成的任务数量和任务列表的总巨细。
    • 使用 join 方法等候三个任务线程实行完成。
    • 计算任务实行的耗时,并输出每个任务的结果。

  • 输出:

    • 每个任务线程在实行时,会输出实行的次数和已完成的任务数量。
    • 额外的监听线程每秒输出已完成的任务数量和任务列表的总巨细。
    • 最后输出每个任务的结果和整个任务实行的耗时。

通过引入 volatile 关键字和阻塞等候的方式,确保了任务的按顺序实行,同时在输出中显示了任务实行的次数、已完成的任务数量以及整个任务实行的耗时。这种方式更加妥当地处理了多线程环境下的任务实行顺序题目。
  1. import java.util.ArrayList;
  2. public class Task {
  3.     int num;
  4.     volatile boolean flagA = false;
  5.     volatile boolean flagB = false;
  6.     volatile boolean flagC = false;
  7.     public void taskA() {
  8.         if (!flagA) {
  9.             sleep(40);
  10.             num += 10;
  11.             flagA = true;
  12.         }
  13.     }
  14.     public void taskB() {
  15.         while (!flagA) {// 阻塞
  16.         }
  17.         if (!flagB) {
  18.             sleep(30);
  19.             num *= 20;
  20.             flagB = true;
  21.         }
  22.     }
  23.     public void taskC() {
  24.         while (!flagB) {// 阻塞
  25.         }
  26.         if (!flagC) {
  27.             sleep(30);
  28.             num *= num;
  29.             flagC = true;
  30.         }
  31.     }
  32.     public boolean isEnd() {
  33.         return flagC;
  34.     }
  35.     public void sleep(long time) {
  36.         try {
  37.             Thread.sleep(time);
  38.         } catch (InterruptedException e) {
  39.             throw new RuntimeException(e);
  40.         }
  41.     }
  42. }
  43. class TA extends Thread {
  44.     ArrayList<Task> list;
  45.     public TA(ArrayList<Task> list) {
  46.         this.list = list;
  47.     }
  48.     @Override
  49.     public void run() {
  50.         for (int i = 0; i < list.size(); i++) {
  51.             Task task = list.get(i);
  52.             task.taskA();
  53.         }
  54.     }
  55. }
  56. class TB extends Thread {
  57.     ArrayList<Task> list;
  58.     public TB(ArrayList<Task> list) {
  59.         this.list = list;
  60.     }
  61.     @Override
  62.     public void run() {
  63.         for (int i = 0; i < list.size(); i++) {
  64.             Task task = list.get(i);
  65.             task.taskB();
  66.         }
  67.     }
  68. }
  69. class TC extends Thread {
  70.     ArrayList<Task> list;
  71.     public TC(ArrayList<Task> list) {
  72.         this.list = list;
  73.     }
  74.     @Override
  75.     public void run() {
  76.         for (int i = 0; i < list.size(); i++) {
  77.             Task task = list.get(i);
  78.             task.taskC();
  79.         }
  80.     }
  81. }
  82. class Main {
  83.     public static void main(String[] args) {
  84. // 1: 定量任务
  85.         ArrayList<Task> list = new ArrayList<>();
  86.         for (int i = 0; i < 500; i++) {
  87.             list.add(new Task());
  88.         }
  89.         TA ta = new TA(list);
  90.         TB tb = new TB(list);
  91.         TC tc = new TC(list);
  92.         ta.start();
  93.         tb.start();
  94.         tc.start();
  95. // 监听状态线程
  96.         new Thread() {
  97.             @Override
  98.             public void run() {
  99.                 while (true) {
  100.                     try {
  101.                         Thread.sleep(1000);
  102.                     } catch (InterruptedException e) {
  103.                         throw new RuntimeException(e);
  104.                     }
  105.                     int size = 0;
  106.                     for (int i = 0; i < list.size(); i++) {
  107.                         if (list.get(i).num == 40000) {
  108.                             size++;
  109.                         }
  110.                     }
  111.                     System.out.println(list.size());
  112.                     System.out.println("已完成:" + size + "任务");
  113.                 }
  114.             }
  115.         }.start();
  116.         long start = System.currentTimeMillis();
  117.         try {
  118.             ta.join();
  119.         } catch (InterruptedException e) {
  120.             throw new RuntimeException(e);
  121.         }
  122.         try {
  123.             tb.join();
  124.         } catch (InterruptedException e) {
  125.             throw new RuntimeException(e);
  126.         }
  127.         try {
  128.             tc.join();
  129.         } catch (InterruptedException e) {
  130.             throw new RuntimeException(e);
  131.         }
  132.         long end = System.currentTimeMillis();
  133.         for (int i = 0; i < list.size(); i++) {
  134.             System.out.println(list.get(i).num);
  135.         }
  136.         System.out.println("耗时:" + (end - start) + " ms");
  137.     }
  138. }
复制代码
V4

版本4:展示了一种通过使用队列(LinkedList)和质料库(taskA、taskB、taskC)的方式来实现任务的生产和消费。每个任务分别在差别的线程中实行,通过队列和质料库的操作来确保任务按照特定的顺序实行。


  • Task 类:

    • taskA、taskB、taskC 分别对应三个差别的任务,对 num 进行差别的操作。

  • TA、TB、TC 类:

    • 这三个类分别表示实行任务 A、B、C 的线程,通过队列和质料库的操作来确保任务按照特定的顺序实行。
    • run 方法中使用 poll 方法从队列中取出任务,实行相应的任务方法,然后将任务放入下一个阶段的质料库中。

  • Main 类:

    • 创建了 taskA、taskB、taskC 作为任务的质料库。
    • 创建了 tasks 作为制品库,用于存储已完成的任务。
    • 创建了 TA、TB 和 TC 的实例,并将相应的质料库和制品库通报给它们。
    • 启动了三个线程,分别实行 taskA、taskB 和 taskC。
    • 创建了一个额外的线程,该线程每秒输出已完成的任务数量和任务列表的总巨细。

  • 输出:

    • 每个任务线程在实行时,会输出实行的次数和已完成的任务数量。
    • 额外的监听线程每秒输出已完成的任务数量和任务列表的总巨细。

通过这种队列和质料库的方式,确保了任务的顺序实行。任务被依次推送到差别的阶段,从而保证了任务的有序性。在这个模型中,每个任务线程负责从上一个阶段的质料库取任务,实行后将任务放入下一个阶段的质料库,以此类推,直至完成。
  1. import java.util.ArrayList;
  2. import java.util.LinkedList;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. public class Task {
  5.     int num;
  6.     public void taskA() {
  7.         num += 10;
  8.     }
  9.     public void taskB() {
  10.         num *= 20;
  11.     }
  12.     public void taskC() {
  13.         num *= num;
  14.     }
  15. }
  16. class TA extends Thread {
  17.     LinkedList<Task> taskB;
  18.     LinkedList<Task> taskA;
  19.     public TA(LinkedList<Task> taskA, LinkedList<Task> taskB) {
  20.         this.taskA = taskA;
  21.         this.taskB = taskB;
  22.     }
  23.     @Override
  24.     public void run() {
  25.         while (true) {
  26.             Task task = taskA.poll();// 出列
  27.             if (task != null) {
  28.                 task.taskA();// 执行A任务
  29.                 taskB.offer(task);// 入队B
  30.             }
  31.         }
  32.     }
  33. }
  34. class TB extends Thread {
  35.     LinkedList<Task> taskB;
  36.     LinkedList<Task> taskC;
  37.     public TB(LinkedList<Task> taskB, LinkedList<Task> taskC) {
  38.         this.taskB = taskB;
  39.         this.taskC = taskC;
  40.     }
  41.     @Override
  42.     public void run() {
  43.         while (true) {
  44.             Task task = taskB.poll();// 出列
  45.             if (task != null) {
  46.                 task.taskB();// 执行A任务
  47.                 taskC.offer(task);// 入队B
  48.             }
  49.         }
  50.     }
  51. }
  52. class TC extends Thread {
  53.     ArrayList<Task> tasks;
  54.     LinkedList<Task> taskC;
  55.     public TC(LinkedList<Task> taskC, ArrayList<Task> tasks) {
  56.         this.taskC = taskC;
  57.         this.tasks = tasks;
  58.     }
  59.     @Override
  60.     public void run() {
  61.         while (true) {
  62.             Task task = taskC.poll();// 出列
  63.             if (task != null) {
  64.                 task.taskC();// 执行A任务
  65.                 tasks.add(task);// 入队B
  66.             }
  67.         }
  68.     }
  69. }
  70. class Main {
  71.     public static void main(String[] args) {
  72. // 1: 定量任务
  73.         LinkedList<Task> taskA = new LinkedList<>();// A的原料库
  74.         LinkedList<Task> taskB = new LinkedList<>();// B的原料库
  75.         LinkedList<Task> taskC = new LinkedList<>();// C的原料库
  76.         ArrayList<Task> tasks = new ArrayList<>();// 成品库
  77.         ArrayBlockingQueue<Task> tasks1 = new ArrayBlockingQueue<>(50);
  78.         for (int i = 0; i < 500; i++) {
  79.             taskA.offer(new Task());// 入队
  80.         }
  81.         TA ta = new TA(taskA, taskB);
  82.         TB tb = new TB(taskB, taskC);
  83.         TC tc = new TC(taskC, tasks);
  84.         ta.start();
  85.         tb.start();
  86.         tc.start();
  87. // 监听状态线程
  88.         new Thread() {
  89.             @Override
  90.             public void run() {
  91.                 while (true) {
  92.                     try {
  93.                         Thread.sleep(1000);
  94.                     } catch (InterruptedException e) {
  95.                         throw new RuntimeException(e);
  96.                     }
  97.                     int size = 0;
  98.                     for (int i = 0; i < tasks.size(); i++) {
  99.                         System.out.println(tasks.get(i).num);
  100.                         if (tasks.get(i).num == 40000) {
  101.                             size++;
  102.                         }
  103.                     }
  104.                     System.out.println(tasks.size());
  105.                     System.out.println("已完成:" + size + "任务");
  106.                 }
  107.             }
  108.         }.start();
  109.     }
  110. }
复制代码
V5

版本5:代码是一个改进版本,使用了 ArrayBlockingQueue 作为任务的质料库和制品库,以及通过队列操作来保证任务的有序实行。


  • Task 类:

    • taskA、taskB、taskC 方法分别对应三个差别的任务,对 num 进行差别的操作。

  • TA、TB、TC 类:

    • 这三个类分别表示实行任务 A、B、C 的线程,通过 ArrayBlockingQueue 实现了线程之间的协作,确保任务按照特定的顺序实行。
    • run 方法中使用 take 方法从队列中取出任务,实行相应的任务方法,然后将任务放入下一个阶段的队列中。

  • Main 类:

    • 创建了 taskA、taskB、taskC 作为任务的质料库,使用 ArrayBlockingQueue 进行初始化。
    • 创建了 tasks 作为制品库,用于存储已完成的任务。
    • 创建了 TA、TB 和 TC 的实例,并将相应的质料库和制品库通报给它们。
    • 启动了三个线程,分别实行 taskA、taskB 和 taskC。
    • 创建了一个额外的线程,该线程每秒输出已完成的任务数量和任务列表的总巨细。

  • 输出:

    • 每个任务线程在实行时,会输出实行的次数和已完成的任务数量。
    • 额外的监听线程每秒输出已完成的任务数量和任务列表的总巨细。

通过使用 ArrayBlockingQueue 作为质料库和制品库,以及使用 take 和 put 方法来保证线程之间的协作,这段代码实现了有序实行的任务模型。每个任务线程负责从上一个阶段的队列取任务,实行后将任务放入下一个阶段的队列,从而保证了任务的有序性。
  1. import java.util.ArrayList;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. public class Task {
  4.     int num;
  5.     public void taskA() {
  6.         num += 10;
  7.     }
  8.     public void taskB() {
  9.         num *= 20;
  10.     }
  11.     public void taskC() {
  12.         num *= num;
  13.     }
  14. }
  15. class TA extends Thread {
  16.     ArrayBlockingQueue<Task> taskB;
  17.     ArrayBlockingQueue<Task> taskA;
  18.     public TA(ArrayBlockingQueue<Task> taskA, ArrayBlockingQueue<Task> taskB) {
  19.         this.taskA = taskA;
  20.         this.taskB = taskB;
  21.     }
  22.     @Override
  23.     public void run() {
  24.         while (true) {
  25.             try {
  26.                 Task task = taskA.take(); // 出队
  27.                 task.taskA(); // 执行A任务
  28.                 taskB.put(task); // 入队B
  29.             } catch (InterruptedException e) {
  30.                 throw new RuntimeException(e);
  31.             }
  32.         }
  33.     }
  34. }
  35. class TB extends Thread {
  36.     ArrayBlockingQueue<Task> taskB;
  37.     ArrayBlockingQueue<Task> taskC;
  38.     public TB(ArrayBlockingQueue<Task> taskB, ArrayBlockingQueue<Task> taskC) {
  39.         this.taskB = taskB;
  40.         this.taskC = taskC;
  41.     }
  42.     @Override
  43.     public void run() {
  44.         while (true) {
  45.             try {
  46.                 Task task = taskB.take(); // 出队
  47.                 task.taskB(); // 执行B任务
  48.                 taskC.put(task); // 入队C
  49.             } catch (InterruptedException e) {
  50.                 throw new RuntimeException(e);
  51.             }
  52.         }
  53.     }
  54. }
  55. class TC extends Thread {
  56.     ArrayList<Task> tasks;
  57.     ArrayBlockingQueue<Task> taskC;
  58.     public TC(ArrayBlockingQueue<Task> taskC, ArrayList<Task> tasks) {
  59.         this.taskC = taskC;
  60.         this.tasks = tasks;
  61.     }
  62.     @Override
  63.     public void run() {
  64.         while (true) {
  65.             try {
  66.                 Task task = taskC.take(); // 出队
  67.                 task.taskC(); // 执行C任务
  68.                 tasks.add(task); // 入队结果列表
  69.             } catch (InterruptedException e) {
  70.                 throw new RuntimeException(e);
  71.             }
  72.         }
  73.     }
  74. }
  75. class Main {
  76.     public static void main(String[] args) {
  77.         // 1:定量任务
  78.         ArrayBlockingQueue<Task> taskA = new ArrayBlockingQueue<>(500); // A的原料库
  79.         ArrayBlockingQueue<Task> taskB = new ArrayBlockingQueue<>(500); // B的原料库
  80.         ArrayBlockingQueue<Task> taskC = new ArrayBlockingQueue<>(500); // C的原料库
  81.         ArrayList<Task> tasks = new ArrayList<>(); // 成品库
  82.         for (int i = 0; i < 500; i++) {
  83.             taskA.offer(new Task()); // 入队
  84.         }
  85.         TA ta = new TA(taskA, taskB);
  86.         TB tb = new TB(taskB, taskC);
  87.         TC tc = new TC(taskC, tasks);
  88.         ta.start();
  89.         tb.start();
  90.         tc.start();
  91.         // 监听线程状态
  92.         new Thread() {
  93.             @Override
  94.             public void run() {
  95.                 while (true) {
  96.                     try {
  97.                         Thread.sleep(1000);
  98.                     } catch (InterruptedException e) {
  99.                         throw new RuntimeException(e);
  100.                     }
  101.                     int size = 0;
  102.                     for (int i = 0; i < tasks.size(); i++) {
  103.                         System.out.println(tasks.get(i).num);
  104.                         if (tasks.get(i).num == 40000) {
  105.                             size++;
  106.                         }
  107.                     }
  108.                     System.out.println(tasks.size());
  109.                     System.out.println("已完成:" + size + "任务");
  110.                 }
  111.             }
  112.         }.start();
  113.     }
  114. }
复制代码


阻塞队列

ArrayBlockingQueue 是 Java 中 BlockingQueue 接口的一个具体实现,它基于数组实现的有界阻塞队列。以下是对 ArrayBlockingQueue 的详细讲解:
特点和用途:



  • 有界队列: ArrayBlockingQueue 是一个有界队列,其容量在创建时被指定,不能动态扩展。这意味着队列中的元素数量不能超过指定的容量。
  • 阻塞操作: 当队列满时,试图将元素放入队列的操作将被阻塞,直到队列有空间。当队列为空时,试图从队列中取出元素的操作将被阻塞,直到队列中有元素。
  • 线程安全: ArrayBlockingQueue 提供了在多线程环境下安全使用的机制,内部实现使用了锁来掩护队列的操作。
构造方法:



  • ArrayBlockingQueue(int capacity): 创建一个指定容量的 ArrayBlockingQueue,默认环境下为公平战略,即等候时间最长的线程将被优先实行。
  • ArrayBlockingQueue(int capacity, boolean fair): 创建一个指定容量和公平性战略的 ArrayBlockingQueue。
  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c): 创建一个包罗指定聚集元素的 ArrayBlockingQueue,容量为指定容量,公平性由 fair 参数决定。
主要方法:



  • 放入元素:

    • put(E e): 将指定元素放入队列,假如队列满,则阻塞直到队列有空间。
    • offer(E e, long timeout, TimeUnit unit): 将指定元素放入队列,假如队列满,则等候指定时间,超时后返回 false。

  • 取出元素:

    • take(): 取出并删除队列的头部元素,假如队列为空,则阻塞直到队列有元素可取。
    • poll(long timeout, TimeUnit unit): 取出并删除队列的头部元素,假如队列为空,则等候指定时间,超时后返回 null。

  • 其他方法:

    • remainingCapacity(): 返回队列的剩余容量。
    • size(): 返回队列中的元素数量。
    • peek(): 返回队列的头部元素,但不删除。假如队列为空,则返回 null。
    • contains(Object o): 判定队列是否包罗指定元素。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表