并发编程--上篇

打印 上一主题 下一主题

主题 1636|帖子 1636|积分 4908

Java并发探索--上篇

1.基本概念


  • 线程与进程:线程是程序执行的最小单位,而进程是系统举行资源分配和调度的基本单位。例如,一个 Java 程序可以包罗多个线程,它们共享进程的资源。
  • 并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
  • 同步与异步:同步是指程序按照次序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。
“Java并发探索--下篇” --- 在下面找
博客园
https://www.cnblogs.com/jackjavacpp
CSDN
https://blog.csdn.net/okok__TXF
2.探索线程的创建

①线程的状态

从Thread源码里面看出
  1. public enum State {
  2.         // 尚未启动的线程的线程状态。
  3.     NEW,
  4.         // 就绪
  5.     RUNNABLE,
  6.         // 等待监视器锁的线程的线程状态
  7.     BLOCKED,
  8.         /*
  9.         等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
  10.         Object.wait() 没有超时
  11.         Thread.join() 没有超时
  12.         LockSupport.park()
  13.         */
  14.     WAITING,
  15.         /*
  16.         指定等待时间的等待线程的线程状态
  17.         线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
  18.         Thread.sleep
  19.     Object.wait with timeout
  20.     Thread.join with timeout
  21.     LockSupport.parkNanos
  22.     LockSupport.parkUntil
  23.         */
  24.     TIMED_WAITING,
  25.         //终止线程的线程状态。线程已完成执行。
  26.     TERMINATED;
  27. }
复制代码
下面看一张图,很清晰的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】
在Java中,一个Thread有大抵六个状态。
线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 RUNNABLE(停当) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。
明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。
②线程创建

1)两种基本方式


  • 继续Thread类,重写run方法
  1. public class MyThread1 extends Thread {
  2.     @Override
  3.     public void run() {
  4.         System.out.println(Thread.currentThread().getName() + ": hello world");
  5.     }
  6. }
  7. public class JUCMain {
  8.     public static void main(String[] args) {
  9.         new MyThread1().start();
  10.     }
  11. }
复制代码

  • 实现Runnable接口,传入Thread
  1. public class Runnable1 implements Runnable{
  2.     @Override
  3.     public void run() {
  4.         System.out.println("hello world, Runnable");
  5.     }
  6. }
  7. public class JUCMain {
  8.     public static void main(String[] args) {
  9.         new Thread(new Runnable1()).start();
  10.     }
  11. }
复制代码
网上还传有其他创建线程的方式,好比: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。
首先从start()方法看起(这个方式属于Thread类的)。调用start()后,JVM会创建一个新线程并执行该线程的run()方法。注意:直接调用run()不会启动新线程,而是在当前线程中执行。
  1. // 启动线程并触发 JVM 创建原生线程
  2. // synchronized后面解释【见 探索“锁”】
  3. public synchronized void start() {
  4.     // 零状态值对应于状态 “NEW”
  5.     // 线程想要start,必须是为0的状态
  6.     if (threadStatus != 0)
  7.         throw new IllegalThreadStateException();
  8.     /*
  9.     group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
  10.     同时线程组的未启动线程计数会减1。
  11.     */
  12.     group.add(this);
  13.     boolean started = false;
  14.     try {
  15.         start0(); //关键!调用本地方法(native)
  16.         started = true;
  17.     } finally {
  18.         try {
  19.             if (!started) { //启动失败时回滚
  20.                 //如果 started 为 false,说明线程启动失败,
  21.                 //调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
  22.                 group.threadStartFailed(this);
  23.             }
  24.         } catch (Throwable ignore) {
  25.             /* do nothing. If start0 threw a Throwable then
  26.                   it will be passed up the call stack */
  27.         }
  28.     }
  29. }
  30. //========== native
  31. private native void start0();
复制代码
那么执行的是run()方法,run方法里面是啥呢
  1. private Runnable target; // target是Runnable类型
  2. @Override
  3. public void run() {
  4.     if (target != null) {
  5.         target.run();
  6.     }
  7. }
复制代码
如果继续Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。
如果是实现的Runnable接口,new Thread(new Runnable1())的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。
2) 其他创建方式

.lambda


  • lambda表达式创建:这个仅仅是写法差别而已。由于Runnable是个函数式接口
  1. @FunctionalInterface
  2. public interface Runnable {
  3.     public abstract void run();
  4. }
复制代码
.callable


  • Callable创建的方式
  1. public class MyCall implements Callable<String> {
  2.     @Override
  3.     public String call() throws Exception {
  4.         Thread.sleep(2000);
  5.         return "Hello Callable";
  6.     }
  7. }
  8. public static void main(String[] args) throws ExecutionException, InterruptedException {
  9.     FutureTask<String> task = new FutureTask<>(new MyCall());
  10.     new Thread(task).start();
  11.     System.out.println(task.get());
  12. }
复制代码
new Thread(Runnable runnable)要求传的类型是Runnable,但是现在传的是FutureTask。以是先来看一看FutureTask和Runnable之间有什么联系.
从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继续了Future和Runnable两个接口。
Future
Future 接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future 对象获取。
  1. // 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
  2. public interface Future<V> {
  3.     //尝试取消异步任务的执行。
  4.     /*
  5.     如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
  6.     如果任务成功取消,则返回 true。
  7.     */
  8.     boolean cancel(boolean mayInterruptIfRunning);
  9.     //如果任务在完成之前被取消,则返回 true;否则返回 false。
  10.     boolean isCancelled();
  11.     //如果任务已经完成,则返回 true;否则返回 false。
  12.     boolean isDone();
  13.     //获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
  14.     V get() throws InterruptedException, ExecutionException;
  15.     //获取异步任务的计算结果,并且可以指定一个超时时间。
  16.     //如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
  17.     V get(long timeout, TimeUnit unit)
  18.         throws InterruptedException, ExecutionException, TimeoutException;
  19. }
复制代码
RunnableFuture
  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2.     // 很简单嘛,这个是来自Runnable的
  3.     void run();
  4. }
复制代码
这个接口就相当于组合了Runnable和Future,能够获取到返回值了。
FutureTask 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。
  1. public class FutureTask<V> implements RunnableFuture<V> {
  2.     // 基本属性
  3.     private volatile int state;
  4.     private static final int NEW          = 0;
  5.     private static final int COMPLETING   = 1;
  6.     private static final int NORMAL       = 2;
  7.     private static final int EXCEPTIONAL  = 3;
  8.     private static final int CANCELLED    = 4;
  9.     private static final int INTERRUPTING = 5;
  10.     private static final int INTERRUPTED  = 6;
  11.     /** The underlying callable; nulled out after running */
  12.     private Callable<V> callable;
  13.     /** 结果 */
  14.     private Object outcome;
  15.     /** The thread running the callable; CAS ed during run() */
  16.     private volatile Thread runner;
  17.     /** Treiber stack of waiting threads */
  18.     private volatile WaitNode waiters;
  19.    
  20.     // 看它的构造函数1
  21.     public FutureTask(Callable<V> callable) {
  22.         if (callable == null)
  23.             throw new NullPointerException();
  24.         this.callable = callable; // 赋值callable========
  25.         this.state = NEW; // ensure visibility of callable
  26.     }
  27.     // 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
  28.     public FutureTask(Runnable runnable, V result) {
  29.         this.callable = Executors.callable(runnable, result);
  30.         this.state = NEW;       // ensure visibility of callable
  31.     }
  32.     /*
  33.     Executors::callable(xx, xx)方法==========
  34.     public static <T> Callable<T> callable(Runnable task, T result) {
  35.         if (task == null)
  36.             throw new NullPointerException();
  37.         return new RunnableAdapter<T>(task, result);
  38.     }
  39.     static final class RunnableAdapter<T> implements Callable<T> {
  40.         final Runnable task;
  41.         final T result;
  42.         RunnableAdapter(Runnable task, T result) {
  43.             this.task = task;
  44.             this.result = result;
  45.         }
  46.         public T call() {
  47.             task.run(); // 调用Runnable的run()
  48.             return result;
  49.         }
  50.     }
  51.     */
  52.    
  53.     // run()方法 ---------------
  54.     // new Thread(new FutureTask<>(new MyCall()))
  55.     public void run() {
  56.         if (state != NEW ||
  57.             !UNSAFE.compareAndSwapObject(this, runnerOffset,
  58.                                          null, Thread.currentThread()))
  59.             return;
  60.         try {
  61.             Callable<V> c = callable;
  62.             if (c != null && state == NEW) {
  63.                 V result;
  64.                 boolean ran;
  65.                 try {
  66.                     //====调用callable.call()
  67.                     result = c.call();
  68.                     ran = true;
  69.                 } catch (Throwable ex) {
  70.                    .........
  71.                 }
  72.                 // 如果运行OK了,设置结果!
  73.                 if (ran) set(result);
  74.             }
  75.         } finally {
  76.             .............
  77.         }
  78.     }
  79.    
  80.     // 设置结果outcome
  81.     protected void set(V v) {
  82.         // https://www.cnblogs.com/jackjavacpp/p/18787832
  83.         // 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
  84.         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  85.             outcome = v; // 这里
  86.             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  87.             finishCompletion();
  88.         }
  89.     }
  90.    
  91.     // 比较核心的get方法================start
  92.     public V get() throws InterruptedException, ExecutionException {
  93.         int s = state;
  94.         if (s <= COMPLETING) // 如果状态不是完成
  95.             s = awaitDone(false, 0L); // 等待完成
  96.         return report(s); // 返回结果
  97.     }
  98.     private int awaitDone(boolean timed, long nanos)
  99.         throws InterruptedException {
  100.                   // 1.计算超时截止时间
  101.         final long deadline = timed ? System.nanoTime() + nanos : 0L;
  102.         WaitNode q = null;
  103.         boolean queued = false;
  104.         for (;;) { // 2.自旋循环等待任务完成
  105.             // 2.1如果该线程中断了
  106.             if (Thread.interrupted()) {
  107.                 removeWaiter(q);// 从等待队列中移除当前节点
  108.                 throw new InterruptedException();
  109.             }
  110.             // 2.2检查状态
  111.             int s = state;
  112.             // 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
  113.             if (s > COMPLETING) {
  114.                 if (q != null)
  115.                     q.thread = null;
  116.                 return s;// 返回最终状态
  117.             }
  118.             // 2.3若任务状态等于 COMPLETING,表明任务正在完成,
  119.             // 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
  120.             else if (s == COMPLETING) // cannot time out yet
  121.                 Thread.yield();
  122.             else if (q == null)
  123.                 q = new WaitNode();
  124.             else if (!queued) //将节点加入等待队列
  125.                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  126.                                                      q.next = waiters, q);
  127.             else if (timed) { // 2.4如果是有时限的get()
  128.                 nanos = deadline - System.nanoTime();
  129.                 if (nanos <= 0L) {
  130.                     removeWaiter(q);
  131.                     return state; // 返回状态
  132.                 }
  133.                 LockSupport.parkNanos(this, nanos);
  134.             }
  135.             else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
  136.                 LockSupport.park(this);
  137.         }
  138.     }
  139.     private V report(int s) throws ExecutionException {
  140.         Object x = outcome;
  141.         if (s == NORMAL)
  142.             return (V)x; // 返回outcome
  143.        ......
  144.     }
  145.     //==================================end
  146. }
复制代码
从上面的例子可以看出,大抵有ExecutorService,Executors,  newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor类。
接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】别的,Executors只是一个工具类。
Executor是顶级接口
  1. public class PoolMain {
  2.     public static void main(String[] args) {
  3.         // 创建一个线程池
  4.         ExecutorService pool = Executors.newFixedThreadPool(1);
  5.         long start = System.currentTimeMillis();
  6.         // execute=============
  7.         pool.execute(() -> {
  8.             try {
  9.                 Thread.sleep(1000);
  10.             } catch (InterruptedException e) {
  11.                 throw new RuntimeException(e);
  12.             }
  13.             System.out.println("execute pool创建启动线程!");
  14.         });
  15.         // submit==============
  16.         Future<Integer> future = pool.submit(() -> {
  17.             try {
  18.                 Thread.sleep(1000);
  19.             } catch (InterruptedException e) {
  20.                 throw new RuntimeException(e);
  21.             }
  22.             System.out.println("submit pool创建启动线程!");
  23.             return 100;
  24.         });
  25.         try {
  26.             System.out.println(future.get());
  27.         } catch (InterruptedException | ExecutionException e) {
  28.             throw new RuntimeException(e);
  29.         }
  30.         System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
  31.         pool.shutdown();
  32.     }
  33. }
复制代码
ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状态返回Future的方法
[code]public interface ExecutorService extends Executor {    void shutdown();    List shutdownNow();     Future submit(Callable task);     Future submit(Runnable task, T result);    Future submit(Runnable task);    //....     List invokeAll(Collection

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表