Java并发探索--上篇
1.基本概念
- 线程与进程:线程是程序执行的最小单位,而进程是系统举行资源分配和调度的基本单位。例如,一个 Java 程序可以包罗多个线程,它们共享进程的资源。
- 并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
- 同步与异步:同步是指程序按照次序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。
“Java并发探索--下篇” --- 在下面找
【博客园】
https://www.cnblogs.com/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
2.探索线程的创建
①线程的状态
从Thread源码里面看出- public enum State {
- // 尚未启动的线程的线程状态。
- NEW,
- // 就绪
- RUNNABLE,
- // 等待监视器锁的线程的线程状态
- BLOCKED,
- /*
- 等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
- Object.wait() 没有超时
- Thread.join() 没有超时
- LockSupport.park()
- */
- WAITING,
- /*
- 指定等待时间的等待线程的线程状态
- 线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
- Thread.sleep
- Object.wait with timeout
- Thread.join with timeout
- LockSupport.parkNanos
- LockSupport.parkUntil
- */
- TIMED_WAITING,
- //终止线程的线程状态。线程已完成执行。
- TERMINATED;
- }
复制代码 下面看一张图,很清晰的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】
在Java中,一个Thread有大抵六个状态。
线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 RUNNABLE(停当) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。
明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。
②线程创建
1)两种基本方式
- public class MyThread1 extends Thread {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + ": hello world");
- }
- }
- public class JUCMain {
- public static void main(String[] args) {
- new MyThread1().start();
- }
- }
复制代码- public class Runnable1 implements Runnable{
- @Override
- public void run() {
- System.out.println("hello world, Runnable");
- }
- }
- public class JUCMain {
- public static void main(String[] args) {
- new Thread(new Runnable1()).start();
- }
- }
复制代码 网上还传有其他创建线程的方式,好比: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。
首先从start()方法看起(这个方式属于Thread类的)。调用start()后,JVM会创建一个新线程并执行该线程的run()方法。注意:直接调用run()不会启动新线程,而是在当前线程中执行。- // 启动线程并触发 JVM 创建原生线程
- // synchronized后面解释【见 探索“锁”】
- public synchronized void start() {
- // 零状态值对应于状态 “NEW”
- // 线程想要start,必须是为0的状态
- if (threadStatus != 0)
- throw new IllegalThreadStateException();
- /*
- group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
- 同时线程组的未启动线程计数会减1。
- */
- group.add(this);
- boolean started = false;
- try {
- start0(); //关键!调用本地方法(native)
- started = true;
- } finally {
- try {
- if (!started) { //启动失败时回滚
- //如果 started 为 false,说明线程启动失败,
- //调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
- group.threadStartFailed(this);
- }
- } catch (Throwable ignore) {
- /* do nothing. If start0 threw a Throwable then
- it will be passed up the call stack */
- }
- }
- }
- //========== native
- private native void start0();
复制代码 那么执行的是run()方法,run方法里面是啥呢- private Runnable target; // target是Runnable类型
- @Override
- public void run() {
- if (target != null) {
- target.run();
- }
- }
复制代码 如果继续Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。
如果是实现的Runnable接口,new Thread(new Runnable1())的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。
2) 其他创建方式
.lambda
- lambda表达式创建:这个仅仅是写法差别而已。由于Runnable是个函数式接口
- @FunctionalInterface
- public interface Runnable {
- public abstract void run();
- }
复制代码 .callable
- public class MyCall implements Callable<String> {
- @Override
- public String call() throws Exception {
- Thread.sleep(2000);
- return "Hello Callable";
- }
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- FutureTask<String> task = new FutureTask<>(new MyCall());
- new Thread(task).start();
- System.out.println(task.get());
- }
复制代码 new Thread(Runnable runnable)要求传的类型是Runnable,但是现在传的是FutureTask。以是先来看一看FutureTask和Runnable之间有什么联系.
从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继续了Future和Runnable两个接口。
Future
Future 接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future 对象获取。- // 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
- public interface Future<V> {
- //尝试取消异步任务的执行。
- /*
- 如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
- 如果任务成功取消,则返回 true。
- */
- boolean cancel(boolean mayInterruptIfRunning);
- //如果任务在完成之前被取消,则返回 true;否则返回 false。
- boolean isCancelled();
- //如果任务已经完成,则返回 true;否则返回 false。
- boolean isDone();
- //获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
- V get() throws InterruptedException, ExecutionException;
- //获取异步任务的计算结果,并且可以指定一个超时时间。
- //如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
复制代码 RunnableFuture- public interface RunnableFuture<V> extends Runnable, Future<V> {
- // 很简单嘛,这个是来自Runnable的
- void run();
- }
复制代码 这个接口就相当于组合了Runnable和Future,能够获取到返回值了。
FutureTask 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。- public class FutureTask<V> implements RunnableFuture<V> {
- // 基本属性
- private volatile int state;
- private static final int NEW = 0;
- private static final int COMPLETING = 1;
- private static final int NORMAL = 2;
- private static final int EXCEPTIONAL = 3;
- private static final int CANCELLED = 4;
- private static final int INTERRUPTING = 5;
- private static final int INTERRUPTED = 6;
- /** The underlying callable; nulled out after running */
- private Callable<V> callable;
- /** 结果 */
- private Object outcome;
- /** The thread running the callable; CAS ed during run() */
- private volatile Thread runner;
- /** Treiber stack of waiting threads */
- private volatile WaitNode waiters;
-
- // 看它的构造函数1
- public FutureTask(Callable<V> callable) {
- if (callable == null)
- throw new NullPointerException();
- this.callable = callable; // 赋值callable========
- this.state = NEW; // ensure visibility of callable
- }
- // 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
- /*
- Executors::callable(xx, xx)方法==========
- public static <T> Callable<T> callable(Runnable task, T result) {
- if (task == null)
- throw new NullPointerException();
- return new RunnableAdapter<T>(task, result);
- }
- static final class RunnableAdapter<T> implements Callable<T> {
- final Runnable task;
- final T result;
- RunnableAdapter(Runnable task, T result) {
- this.task = task;
- this.result = result;
- }
- public T call() {
- task.run(); // 调用Runnable的run()
- return result;
- }
- }
- */
-
- // run()方法 ---------------
- // new Thread(new FutureTask<>(new MyCall()))
- public void run() {
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- try {
- Callable<V> c = callable;
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- //====调用callable.call()
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- .........
- }
- // 如果运行OK了,设置结果!
- if (ran) set(result);
- }
- } finally {
- .............
- }
- }
-
- // 设置结果outcome
- protected void set(V v) {
- // https://www.cnblogs.com/jackjavacpp/p/18787832
- // 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
- if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
- outcome = v; // 这里
- UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
- finishCompletion();
- }
- }
-
- // 比较核心的get方法================start
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- if (s <= COMPLETING) // 如果状态不是完成
- s = awaitDone(false, 0L); // 等待完成
- return report(s); // 返回结果
- }
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- // 1.计算超时截止时间
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- for (;;) { // 2.自旋循环等待任务完成
- // 2.1如果该线程中断了
- if (Thread.interrupted()) {
- removeWaiter(q);// 从等待队列中移除当前节点
- throw new InterruptedException();
- }
- // 2.2检查状态
- int s = state;
- // 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;
- return s;// 返回最终状态
- }
- // 2.3若任务状态等于 COMPLETING,表明任务正在完成,
- // 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- else if (q == null)
- q = new WaitNode();
- else if (!queued) //将节点加入等待队列
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- else if (timed) { // 2.4如果是有时限的get()
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);
- return state; // 返回状态
- }
- LockSupport.parkNanos(this, nanos);
- }
- else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
- LockSupport.park(this);
- }
- }
- private V report(int s) throws ExecutionException {
- Object x = outcome;
- if (s == NORMAL)
- return (V)x; // 返回outcome
- ......
- }
- //==================================end
- }
复制代码 从上面的例子可以看出,大抵有ExecutorService,Executors, newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor类。
接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】别的,Executors只是一个工具类。
 Executor是顶级接口- public class PoolMain {
- public static void main(String[] args) {
- // 创建一个线程池
- ExecutorService pool = Executors.newFixedThreadPool(1);
- long start = System.currentTimeMillis();
- // execute=============
- pool.execute(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- System.out.println("execute pool创建启动线程!");
- });
- // submit==============
- Future<Integer> future = pool.submit(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- System.out.println("submit pool创建启动线程!");
- return 100;
- });
- try {
- System.out.println(future.get());
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
- pool.shutdown();
- }
- }
复制代码 ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状态返回Future的方法
[code]public interface ExecutorService extends Executor { void shutdown(); List shutdownNow(); Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); //.... List invokeAll(Collection |