目录

Java和Springboot中的多线程

java多线程实现

java多线程本质上是实现Runnable接口

/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/t0.png

我们都知道启动一个线程,必须调用一个Thread的start()方法。在面试时经常可能会被问到start()和run()方法的区别,为什么一定要用start()方法才是启动线程?对比start()方法和run()的源码一看便知:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * 
     * 1、start方法将导致this thread开始执行。由JVM调用this thread的run方法。
     * 
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * 
     * 2、结果是 调用start方法的当前线程 和 执行run方法的另一个线程 并发运行。
     * 
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * 3、多次启动线程永远不合法。 特别是,线程一旦完成执行就不会重新启动。
     * 
     * @exception  IllegalThreadStateException  if the thread was already started.
     * 如果线程已启动,则抛出异常。
     * @see        #run()
     * @see        #stop()
     */
    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         * 
         * 4、对于由VM创建/设置的main方法线程或“system”组线程,不会调用此方法。 
         *    未来添加到此方法的任何新功能可能也必须添加到VM中。
         * 
         * A zero status value corresponds to state "NEW".
         * 5、status=0 代表是 status 是 "NEW"。
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. 
         * 
         * 6、通知组该线程即将启动,以便将其添加到线程组的列表中,
         *    并且减少线程组的未启动线程数递减。
         * 
         * */
        group.add(this);

        boolean started = false;
        try {
            //7、调用native方法,底层开启异步线程,并调用run方法。
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then it will be passed up the call stack 
                 * 8、忽略异常。 如果start0抛出一个Throwable,它将被传递给调用堆栈。
                 */
            }
        }
    }

 //native方法,JVM创建并启动线程,并调用run方法
 private native void start0();

对于源码中的注释并没有省略,都进行了翻译,可以更好的理解整个启动过程。其中有几个需要注意的点:

  1. start方法用synchronized修饰,为同步方法;
  2. 虽然为同步方法,但不能避免多次调用问题,用threadStatus来记录线程状态,如果线程被多次start会抛出异常;threadStatus的状态由JVM控制。
  3. 使用Runnable时,主线程无法捕获子线程中的异常状态。线程的异常,应在线程内部解决。

0、线程中断机制(interrupt)

优雅的中断线程,是一门艺术

众所周知,Thread.stop, Thread.suspend, Thread.resume 都已经被废弃了。因为它们太暴力了,是不安全的,这种暴力中断线程是一种不安全的操作,举个栗子来说明其可能造成的问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ThreadTest {
    public static void main(String[] args) {

        StopThread stopThread = new StopThread();
        // 启动线程
        stopThread.start();
        try {
            // 休眠1秒,确保线程进入运行
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 暂停线程
        stopThread.stop();
//        stopThread.interrupt();
        // 确保线程已经销毁
        while (stopThread.isAlive()) { }
        // 输出结果
        stopThread.print();

    }

    private static class StopThread extends Thread {

        private int x = 0;
        private int y = 0;

        @Override
        public void run(){
            // 这是一个同步原子操作
            synchronized (this){
                ++x;
                try {
                    // 休眠3秒,模拟耗时操作
                    Thread.sleep(3000);

                }catch (Exception e){
                    e.printStackTrace();
                }
                ++y;
            }
        }
        public void print() {
            System.out.println("x=" + x + " y=" + y);
        }



    }
}

上述代码中,run方法里是一个同步的原子操作,x和y必须要共同增加,然而这里如果调用thread.stop()方法强制中断线程,输出如下:

1
x=1 y=0

没有异常,也破坏了我们的预期。如果这种问题出现在我们的程序中,会引发难以预期的异常。因此这种不安全的方式很早就被废弃了。取而代之的是interrupt(),上述代码如果采用thread.interrupt()方法,输出结果如下:

1
2
3
4
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at ThreadTest$StopThread.run(ThreadTest.java:35)
x=1 y=1

x=1,y=1 这个结果是符合我们的预期,同时还抛出了个异常,这个异常下文详说。

interrupt() 它基于「一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。」思想,是一个比较温柔的做法,它更类似一个标志位。其实作用不是中断线程,而是「通知线程应该中断了」,具体到底中断还是继续运行,应该由被通知的线程自己处理。

interrupt() 并不能真正的中断线程,这点要谨记。需要被调用的线程自己进行配合才行。也就是说,一个线程如果有被中断的需求,那么就需要这样做:

  1. 在正常运行任务时,经常检查本线程的中断标志位,如果被设置了中断标志就自行停止线程。
  2. 在调用阻塞方法时正确处理InterruptedException异常。(例如:catch异常后就结束线程。)

先看下 Thread 类 interrupt 相关的几个方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   // 核心 interrupt 方法
   public void interrupt() {
        if (this != Thread.currentThread()) // 非本线程,需要检查权限
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // 仅仅设置interrupt标志位
                b.interrupt(this);    // 调用如 I/O 操作定义的中断方法
                return;
            }
        }
        interrupt0();
    }
    // 静态方法,这个方法有点坑,调用该方法调用后会清除中断状态。
    public static boolean interrupted() {
        return currentThread().isInterrupted(true);
    }
    // 这个方法不会清除中断状态
    public boolean isInterrupted() {
        return isInterrupted(false);
    }
   // 上面两个方法会调用这个本地方法,参数代表是否清除中断状态
   private native boolean isInterrupted(boolean ClearInterrupted);

首先讲 interrupt() 方法:

  1. interrupt 中断操作时,非自身打断需要先检测是否有中断权限,这由jvm的安全机制配置;
  2. 如果线程处于sleep, wait, join 等状态,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常;
  3. 如果线程处于I/O阻塞状态,将会抛出ClosedByInterruptException(IOException的子类)异常;
  4. 如果线程在Selector上被阻塞,select方法将立即返回;
  5. 如果非以上情况,将直接标记 interrupt 状态;

注意:interrupt 操作不会打断所有阻塞,只有上述阻塞情况才在jvm的打断范围内,如处于锁阻塞的线程,不会受 interrupt 中断;

阻塞情况下中断,抛出异常后线程恢复非中断状态,即 interrupted = false

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadTest {
    
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(new Task("1"));
        t.start();
        t.interrupt();
    }

    static class Task implements Runnable{
        String name;
        
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("thread has been interrupt!");
            }
            System.out.println("isInterrupted: " + Thread.currentThread().isInterrupted());
            System.out.println("task " + name + " is over");
        }
    }
}

输出:

1
2
3
thread has been interrupt!
isInterrupted: false
task 1 is over

调用Thread.interrupted() 方法后线程恢复非中断状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadTest {
    
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(new Task("1"));
        t.start();
        t.interrupt();
    }

    static class Task implements Runnable{
        String name;
        
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            System.out.println("first :" + Thread.interrupted());
            System.out.println("second:" + Thread.interrupted());
            System.out.println("task " + name + " is over");
        }
    }
}

输出结果:

1
2
3
first :true
second:false
task 1 is over

上述两种隐含的状态恢复操作,是符合常理的,因为线程标记为中断后,用户没有真正中断线程,必然将其恢复为false。理论上Thread.interrupted()调用后,如果已中断,应该执行退出操作,不会重复调用。

1、实现Runnable接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class DemoThreadTask implements Runnable{
    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
    
    public static void main(String[] args) {
        DemoThreadTask task = new DemoThreadTask();
        Thread t = new Thread(task);
        t.start();
        ...
    }
}

实现Runnable接口,利用Runnable实例构造Thread,是较常用且最本质实现。此构造方法相当于对Runnable实例进行一层包装,在线程t启动时,调用Thread的run方法从而间接调用target.run():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class Thread implements Runnable {
    /* What will be run. */
    private Runnable target;

    public void run() {
        if (target != null) {
            target.run();
        }
   }
     ...
}

2、继承Thread类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class DemoThread extends Thread{
    @Override 
    //重写run方法
    public void run() {
        // TODO Auto-generated method stub
    }

    public static void main(String[] args) {
        DemoThread t = new DemoThread();
        t.start();
        ...
    }
}

这种实现方式是显示的继承了Thread,但从类图中我们可以看到,Thread类本身就继承自Runnable,所以继承Thread的本质依然是实现Runnable接口定义的run方法。

需要注意的是继承Thread方式,target对象为null,重写了run方法,导致方式1中的Thread原生的run方法失效,因此并不会调用到target.run()的逻辑,而是直接调用子类重写的run方法。

因为java是单根继承,此方式一般不常用。

3、实现Callable接口并通过FutureTask包装

先看demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class DemoCallable implements Callable<String>{
    @Override
    public String call() throws Exception {
        // TODO Auto-generated method stub
        return null;
    }
    
    public static void main(String[] args) throws Exception {
        DemoCallable c = new DemoCallable();
        FutureTask<String> future = new FutureTask<>(c); 
        Thread t = new Thread(future);
        t.start();
        ...
        String result = future.get(); //同步获取返回结果
        System.out.println(result);
    }
}

实现Callable接口通过FutureTask包装,可以获取到线程的处理结果,future.get()方法获取返回值,如果线程还没执行完,则会阻塞。

这个方法里,明明没有看到run方法,没有看到Runnable,为什么说本质也是实现Runnable接口呢?

回看开篇的类图,FutureTask实现了RunnableFuture,RunnableFuture则实现了Runnable和Future两个接口。因此构造Thread时,FutureTask还是被转型为Runnable使用。因此其本质还是实现Runnable接口。

至于FutureTask的工作原理,后续篇章继续分析。

4、匿名内部类

匿名内部类也有多种变体,上述三种方式都可以使用匿名内部类来隐式实例化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Demo{
    
    public static void main(String[] args) throws Exception {
        //方式一:Thread匿名内部类
        new Thread(){
            @Override
            public void run() {
                // TODO Auto-generated method stub
            }
        }.start();
        
        //方式二:Runnable匿名内部类
        new Thread(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub
            }
        }).start();
        
        ...
    }
}

匿名内部类的优点在于使用方便,不用额外定义类,缺点就是代码可读性差。

5、Lambda表达式

Lambda表达式是jdk8引入的,已不是什么新东西,现在都jdk10了。demo如下:

1
2
3
4
5
6
public class Demo{
    public static void main(String[] args) throws Exception {
        new Thread(() -> System.out.println("running") ).start() ;
        ...
    }
}

如此简洁的Lambda表达式,有没有吸引到你呢?当然本质不多说,还是基于Runnable接口。

6、线程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class DemoThreadTask implements Runnable{
    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("running");
    }
    
    public static void main(String[] args) {
        DemoThreadTask task = new DemoThreadTask();
        ExecutorService ex = Executors.newCachedThreadPool();
        ex.execute(task);
        ...
    }
}

线程池与前面所述其他方式的区别在于执行线程的时候由ExecutorService去执行,最终还是利用Thread创建线程。线程池的优势在于线程的复用,从而提高效率。

关于线程池,后续篇章会继续详解。

7、定时器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class DemoTimmerTask {

    public static void main(String[] args) throws Exception {
        Timer timer = new Timer();
        timer.scheduleAtFixedRate((new TimerTask() {
            @Override
            public void run() {
                System.out.println("定时任务1执行了....");
            }
        }), 2000, 1000);
    }
}

TimerTask的实现了Runnable接口,Timer内部有个TimerThread继承自Thread,因此绕回来还是Thread + Runnable。

ThreadPoolExecutor概述

工作中经常涉及异步任务,通常是使用多线程技术,比如线程池ThreadPoolExecutor,它的执行规则如下:

/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/thread.png

Java源码里面都有大量的注释,认真读懂这些注释,就可以把握其七分工作机制了。关于ThreadPoolExecutor的解析,我们就从其类注释开始。

现将注释大致翻译如下:

ExecutorService(ThreadPoolExecutor的顶层接口)使用线程池中的线程执行每个提交的任务,通常我们使用Executors的工厂方法来创建ExecutorService。

线程池解决了两个不同的问题:

  1. 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
  2. 统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。

为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 但是,在常见场景中,我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。

  • Executors.newCachedThreadPool(无界线程池,自动线程回收)
  • Executors.newFixedThreadPool(固定大小的线程池);
  • Executors.newSingleThreadExecutor(单一后台线程);
1
注:这里没有提到ScheduledExecutorService ,后续解析。

在自定义线程池时,请参考以下指南:

一、Core and maximum pool sizes 核心和最大线程池数量

参数翻译
corePoolSize核心线程池数量
maximumPoolSize最大线程池数量

线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。

当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSizesetMaximumPoolSize进行动态更改。

这段话详细了描述了线程池对任务的处理流程,这里用个图总结一下

/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/t2.png

二、prestartCoreThread 核心线程预启动

在默认情况下,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 prestartCoreThread()prestartAllCoreThreads() 方法动态调整。 如果使用非空队列构建池,则可能需要预先启动线程。

方法作用
prestartCoreThread()创一个空闲任务线程等待任务的到达
prestartAllCoreThreads()创建核心线程池数量的空闲任务线程等待任务的到达

三、ThreadFactory 线程工厂

新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。

通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。

线程应该有modifyThread权限。 如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。

四、Keep-alive times 线程存活时间

如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止 ( 请参阅getKeepAliveTime(TimeUnit) )。这提供了一种在不积极使用线程池时减少资源消耗的方法。

如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)进行动态调整。

防止空闲线程在关闭之前终止,可以使用如下方法:

setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)也可用于将此超时策略应用于核心线程

五、Queuing 队列

BlockingQueue用于存放提交的任务,队列的实际容量与线程池大小相关联。

  • 如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中取一个空闲线程。
  • 如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程,而不是创建一个任务线程。
  • 如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出maximumPoolSize,如果超时maximumPoolSize,则任务将会被拒绝。
1
这个过程参考[线程任务处理流程图]

主要有三种队列策略:

  1. Direct handoffs 直接握手队列 Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。 此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。
  2. Unbounded queues 无界队列 当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。
  3. Bounded queues 有界队列 一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡
  • 使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
  • 使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。 这里主要为了说明有界队列大小和maximumPoolSizes的大小控制,若何降低资源消耗的同时,提高吞吐量

六、Rejected tasks 拒绝任务

拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满; 无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:

  1. AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
  2. CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
  3. DiscardPolicy:直接丢弃新提交的任务;
  4. DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程); 我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。

七、Hook methods 钩子方法 ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法,重写beforeExecute(Thread,Runnable)afterExecute(Runnable,Throwable)方法来操纵执行环境; 例如,重新初始化ThreadLocals,收集统计信息或记录日志等。此外,terminated()在Executor完全终止后需要完成后会被调用,可以重写此方法,以执行任殊处理。 注意:如果hook或回调方法抛出异常,内部的任务线程将会失败并结束。

八、Queue maintenance 维护队列 getQueue()方法可以访问任务队列,一般用于监控和调试。绝不建议将这个方法用于其他目的。当在大量的队列任务被取消时,remove()purge()方法可用于回收空间。

九、Finalization 关闭

如果程序中不在持有线程池的引用,并且线程池中没有线程时,线程池将会自动关闭。如果您希望确保即使用户忘记调用 shutdown()方法也可以回收未引用的线程池,使未使用线程最终死亡。那么必须通过设置适当的 keep-alive times 并设置allowCoreThreadTimeOut(boolean) 或者 使 corePoolSize下限为0 。 一般情况下,线程池启动后建议手动调用shutdown()关闭。

ThreadPoolExecutor使用

/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/t3.png

我们以最后一个构造方法(参数最多的那个),对其参数进行解释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
 public ThreadPoolExecutor(int corePoolSize, // 1
                              int maximumPoolSize,  // 2
                              long keepAliveTime,  // 3
                              TimeUnit unit,  // 4
                              BlockingQueue<Runnable> workQueue, // 5
                              ThreadFactory threadFactory,  // 6
                              RejectedExecutionHandler handler ) { //7
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
序号名称类型含义
1corePoolSizeint核心线程池大小
2maximumPoolSizeint最大线程池大小
3keepAliveTimelong线程最大空闲时间
4unitTimeUnit时间单位
5workQueueBlockingQueue线程等待队列
6threadFactoryThreadFactory线程创建工厂
7handlerRejectedExecutionHandler拒绝策略

如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述。 知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:

一、预定义线程池

  1. FixedThreadPool
1
2
3
4
5
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
  • keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
  • workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
  • FixedThreadPool的任务执行是无序的;

适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

  1. CachedThreadPool
1
2
3
4
5
     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
  • keepAliveTime = 60s,线程空闲60s后自动结束。
  • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;

适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。

  1. SingleThreadExecutor
1
2
3
4
5
6
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    public static void main(String[] args) {
        ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
        System.out.println(threadPoolExecutor.getMaximumPoolSize());
        threadPoolExecutor.setCorePoolSize(8);
        
        ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
//      运行时异常 java.lang.ClassCastException
//      ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
    }

对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。

  1. ScheduledThreadPool
1
2
3
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。

1
2
3
4
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

对于ScheduledThreadPool本文不做描述,其特性请关注后续篇章。详细参考附录。

二、自定义线程池

以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolTest {

    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
//        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        ThreadFactory threadFactory = new NameThreadFactory();
        RejectedExecutionHandler rejectedExecutionHandler = new MyIgnorePolicy();
        ThreadPoolExecutor tpexecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, rejectedExecutionHandler);
        // 预启动所有核心线程
        tpexecutor.prestartAllCoreThreads();

        for (int i = 1; i <= 10; i++) {
            MyTask myTask = new MyTask(String.valueOf(i));
            tpexecutor.execute(myTask);
        }

        try {
            //阻塞主线程
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class NameThreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    public static class MyIgnorePolicy implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            doLog(r, executor);
        }

        private void doLog(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(r.toString() + " rejected");
            System.out.println("getCompletedTaskCount: " + executor.getCompletedTaskCount());

        }
    }

    public static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(this.toString() + "is running!");
                //让任务执行慢点
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getName() {
            return name;
        }

        public String toString() {
            return "MyTask [name=" + name + "]";
        }

    }
}

输出结果如下:

my-thread-1 has been created my-thread-2 has been created my-thread-3 has been created MyTask [name=2]is running! MyTask [name=1]is running! my-thread-4 has been created MyTask [name=3]is running! MyTask [name=6]is running! getCompletedTaskCount: 0 getCompletedTaskCount: 0 getCompletedTaskCount: 0 getCompletedTaskCount: 0 MyTask [name=7] rejected MyTask [name=8] rejected MyTask [name=9] rejected MyTask [name=10] rejected MyTask [name=4]is running! MyTask [name=5]is running!

其中线程线程1-4先占满了核心线程和最大线程数量[4],然后4、5线程进入等待队列[队列大小为4],7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。

说明:自定义线程池参数可以根据具体业务场景配置。

ThreadPoolExecutor状态控制

读懂ThreadPoolExecutor执行原理,需要先掌握其状态控制的方式,因为使用了大量位运算,读起来有点吃力,所以单独用一篇文章分析。以下是ThreadPoolExecutor状态控制的主要变量和方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    //原子状态控制数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //29比特位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //实际容量 2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // runState存储在高位中
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl 打包和解压ctl

    // 解压runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 解压workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 打包ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池使用一个AtomicInteger的ctl变量将 workerCount(工作线程数量)和 runState(运行状态)两个字段压缩在一起 ,这种做法在在java源码里经常有出现,如在 ReentrantReadWriteLock 里就将一个int分成高16位和底16位,分别表示读锁状态和写锁状态。ThreadPoolExecutor里也是使用了同样的思想,表现得更加复杂。

ThreadPoolExecutor用3个比特位表示runState, 29个比特位表示workerCount。因此这里需要特别说明的是:

确切的说,当最大线程数量配置为Integer.MXA_VAULE时,ThreadPoolExecutor的线程最大数量依然是2^29-1

目前来看这是完全够用的,但随着计算机的不断发展,真的到了不够用的时候可以改变为AtomicLong。这如同32位系统时间戳会在2038年01月19日03时14分07秒耗尽一样,当以后我们的系统线程能够超过2^29-1时,这些代码就需要调整了。对于未来,无限可能。

思考一下为什么是29:3呢? 这是因为我们的运营状态有5种,向上取2次方数,2^3 = 8。所以必须要3个比特位来表示各种状态。

运行状态解释:

状态解释
RUNNING运行态,可处理新任务并执行队列中的任务
SHUTDOW关闭态,不接受新任务,但处理队列中的任务
STOP停止态,不接受新任务,不处理队列中任务,且打断运行中任务
TIDYING整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
TERMINATED结束态,terminated() 方法已完成

整个ctl的状态,会在线程池的不同运行阶段进行CAS转换。

ThreadPoolExecutor执行原理

execute()

分析ThreadPoolExecutor的执行原理,直接从execute方法开始

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 1、工作线程 < 核心线程 
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2、运行态,并尝试将任务加入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } // 3、使用尝试使用最大线程运行
        else if (!addWorker(command, false))
            reject(command);
    }

这三处if判断,还是比较泛的,整体大框框上的流程,可用下图表示。

/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/t2.png

在execute方法中,用到了double-check的思想,我们看到上述代码中并没有同步控制,都是基于乐观的check,如果任务可以创建则进入addWorker(Runnable firstTask, boolean core)方法,注意上述代码中的三种传参方式:

  • addWorker(command, true): 创建核心线程执行任务;
  • addWorker(command, false):创建非核心线程执行任务;
  • addWorker(null, false): 创建非核心线程,当前任务为空;

addWorker的返回值是boolean,不保证操作成功。下面详看addWorker方法(代码稍微有点长):

addWorker()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {
    // 第一部分:自旋、CAS、重读ctl 等结合,直到确定是否可以创建worker,
    // 可以则跳出循环继续操作,否则返回false
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c)) // CAS增长workerCount,成功则跳出循环
                break retry;
            c = ctl.get();  // Re-read ctl 重新获取ctl
            if (runStateOf(c) != rs) // 状态改变则继续外层循环,否则在内层循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 第二部分:创建worker,这部分使用ReentrantLock锁
    boolean workerStarted = false; // 线程启动标志位
    boolean workerAdded = false;  // 线程是否加入workers 标志位
    Worker w = null;
    try {
        w = new Worker(firstTask); //创建worker
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取到锁以后仍需检查ctl,可能在上一个获取到锁处理的线程可能会改变runState
                // 如 ThreadFactory 创建失败 或线程池被 shut down等
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); // 启动线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w); // 失败操作
    }
    return workerStarted;
}

addWorker的工作可分为两个部分:

  • 第一部分:原子操作,判断是否可以创建worker。通过自旋、CAS、ctl 等操作,判断继续创建还是返回false,自旋周期一般很短。
  • 第二部分:同步创建workder,并启动线程。

Class Worker

第一部分思路理清楚,就可以理解了。下面详解第二部分的Worker:

Worker是ThreadPoolExecutor的内部类,实现了 AbstractQueuedSynchronizer 并继承了 Runnable。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    /** 每个worker有自己的内部线程,ThreadFactory创建失败时是null */
    final Thread thread;
    /** 初始化任务,可能是null */
    Runnable firstTask;
    /** 每个worker的完成任务数 */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // 禁止线程在启动前被打断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** 重要的执行方法  */
    public void run() {
        runWorker(this);
    }

    // state = 0 代表未锁;state = 1 代表已锁

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    // interrupt已启动线程
    void interruptIfStarted() {
        Thread t;
        // 初始化是 state = -1,不会被interrupt
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker 实现了简单的 非重入互斥锁,互斥容易理解,非重入是为了避免线程池的一些控制方法获得重入锁,比如setCorePoolSize操作。注意 Worker 实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可interrupt,以及其他的监控,如线程是否 active(正在执行任务)。

线程池里线程是否处于运行状态与普通线程不一样,普通线程可以调用 Thread.currentThread().isAlive() 方法来判断,而线程池,在run方法中可能在等待获取新任务,这期间线程线程是 alive 但是却不是 active。

runWorker代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许被 interrupt
        boolean completedAbruptly = true;
        try {
            // loop 直至 task = null (线程池关闭、超时等)
            // 注意这里的getTask()方法,我们配置的阻塞队列会在这里起作用
            while (task != null || (task = getTask()) != null) {
                w.lock();  // 执行任务前上锁
                // 如果线程池停止,确保线程中断; 如果没有,确保线程不中断。这需要在第二种情况下进行重新获取ctl,以便在清除中断时处理shutdownNow竞争
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); // 扩展点
                    Throwable thrown = null;
                    try {
                        task.run(); // 真正执行run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); // 扩展点
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); // 线程退出工作
        }
    }

runWorker的主要任务就是一直loop循环,来一个任务处理一个任务,没有任务就去getTask(),getTask()可能会阻塞,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
    boolean timedOut = false; // 上一次 poll() 是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 是否继续处理任务 可以参见上一篇的状态控制
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否允许超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask()方法里面主要用我们配置的workQueue来工作,其阻塞原理与超时原理基于阻塞队列实现,这里不做详解。

总结,ThreadPoolExecutor的执行主要围绕Worker,Worker 实现了 AbstractQueuedSynchronizer 并继承了 Runnable,其对锁的妙运用,值得思考。

ScheduledThreadPoolExecutor

参考如下:

线程池之ScheduledThreadPoolExecutor概述 线程池之ScheduledThreadPoolExecutor调度原理


Springboot 多线程

在Springboot中对其进行了简化处理,只需要配置一个类型为java.util.concurrent.TaskExecutor或其子类的bean,并在配置类或直接在程序入口类上声明注解@EnableAsync

调用也简单,在由Spring管理的对象的方法上标注注解@Async,显式调用即可生效。

一般使用Spring提供的ThreadPoolTaskExecutor类。

配置实例快速使用

SpringBoot应用中需要添加@EnableAsync注解,来开启异步调用,一般还会配置一个线程池,异步的方法交给特定的线程池完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableAsync
public class AsyncConfiguration {

    @Bean("doSomethingExecutor")
    public Executor doSomethingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数:线程池创建时候初始化的线程数
        executor.setCorePoolSize(10);
        // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(20);
        // 缓冲队列:用来缓冲执行任务的队列
        executor.setQueueCapacity(500);
        // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("do-something-");
        // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
        return executor;
    }

}

使用的方式非常简单,在需要异步的方法上加@Async注解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@RestController
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @GetMapping("/open/something")
    public String something() {
        int count = 10;
        for (int i = 0; i < count; i++) {
            asyncService.doSomething("index = " + i);
        }
        lon
        return "success";
    }
}


@Slf4j
@Service
public class AsyncService {

    // 指定使用beanname为doSomethingExecutor的线程池
    @Async("doSomethingExecutor")
    public String doSomething(String message) {
        log.info("do something, message={}", message);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("do something error: ", e);
        }
        return message;
    }
}

访问:127.0.0.1:8080/open/something,日志如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
2020-04-19 23:42:42.486  INFO 21168 --- [io-8200-exec-17] x.g.b.system.controller.AsyncController  : do something end, time 8 milliseconds
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-1] x.gits.boot.system.service.AsyncService  : do something, message=index = 0
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-5] x.gits.boot.system.service.AsyncService  : do something, message=index = 4
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-4] x.gits.boot.system.service.AsyncService  : do something, message=index = 3
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-6] x.gits.boot.system.service.AsyncService  : do something, message=index = 5
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-9] x.gits.boot.system.service.AsyncService  : do something, message=index = 8
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-8] x.gits.boot.system.service.AsyncService  : do something, message=index = 7
2020-04-19 23:42:42.488  INFO 21168 --- [do-something-10] x.gits.boot.system.service.AsyncService  : do something, message=index = 9
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-7] x.gits.boot.system.service.AsyncService  : do something, message=index = 6
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-2] x.gits.boot.system.service.AsyncService  : do something, message=index = 1
2020-04-19 23:42:42.488  INFO 21168 --- [ do-something-3] x.gits.boot.system.service.AsyncService  : do something, message=index = 2

由此可见已经达到异步执行的效果了,并且使用到了咱们配置的线程池。

获取异步方法返回值

当异步方法有返回值时,如何获取异步方法执行的返回结果呢?这时需要异步调用的方法带有返回值CompletableFuture。

CompletableFuture是对Feature的增强,Feature只能处理简单的异步任务,而CompletableFuture可以将多个异步任务进行复杂的组合。如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@RestController
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @SneakyThrows
    @ApiOperation("异步 有返回值")
    @GetMapping("/open/somethings")
    public String somethings() {
        CompletableFuture<String> createOrder = asyncService.doSomething1("create order");
        CompletableFuture<String> reduceAccount = asyncService.doSomething2("reduce account");
        CompletableFuture<String> saveLog = asyncService.doSomething3("save log");

        // 等待所有任务都执行完
        CompletableFuture.allOf(createOrder, reduceAccount, saveLog).join();
        // 获取每个任务的返回结果
        String result = createOrder.get() + reduceAccount.get() + saveLog.get();
        return result;
    }
}


@Slf4j
@Service
public class AsyncService {

    @Async("doSomethingExecutor")
    public CompletableFuture<String> doSomething1(String message) throws InterruptedException {
        log.info("do something1: {}", message);
        Thread.sleep(1000);
        return CompletableFuture.completedFuture("do something1: " + message);
    }

    @Async("doSomethingExecutor")
    public CompletableFuture<String> doSomething2(String message) throws InterruptedException {
        log.info("do something2: {}", message);
        Thread.sleep(1000);
        return CompletableFuture.completedFuture("; do something2: " + message);
    }

    @Async("doSomethingExecutor")
    public CompletableFuture<String> doSomething3(String message) throws InterruptedException {
        log.info("do something3: {}", message);
        Thread.sleep(1000);
        return CompletableFuture.completedFuture("; do something3: " + message);
    }
}

访问接口

1
2
C:\Users\Administrator>curl -X GET "http://localhost:8200/open/somethings" -H "accept: */*"
do something1: create order; do something2: reduce account; do something3: save log

控制台上关键日志如下:

1
2
3
2020-04-20 00:27:42.238  INFO 5672 --- [ do-something-3] x.gits.boot.system.service.AsyncService  : do something3: save log
2020-04-20 00:27:42.238  INFO 5672 --- [ do-something-2] x.gits.boot.system.service.AsyncService  : do something2: reduce account
2020-04-20 00:27:42.238  INFO 5672 --- [ do-something-1] x.gits.boot.system.service.AsyncService  : do something1: create order

以上多线程之间并无执行和完成先后顺序

注意事项

@Async注解会在以下几个场景失效,也就是说明明使用了@Async注解,但就没有走多线程。

  • 异步方法使用static关键词修饰;
  • 异步类不是一个Spring容器的bean(一般使用注解@Component@Service,并且能被Spring扫描到);
  • SpringBoot应用中没有添加@EnableAsync注解;
  • 在同一个类中,一个方法调用另外一个有@Async注解的方法,注解不会生效。原因是@Async注解的方法,是在代理类中执行的。

需要注意的是: 异步方法使用注解@Async的返回值只能为void或者Future及其子类,当返回结果为其他类型时,方法还是会异步执行,但是返回值都是null

附录

线程启动原理

线程中断机制

多线程实现方式

FutureTask实现原理

线程池之ThreadPoolExecutor概述

线程池之ThreadPoolExecutor使用

线程池之ThreadPoolExecutor状态控制

线程池之ThreadPoolExecutor执行原理

线程池之ScheduledThreadPoolExecutor概述

线程池之ScheduledThreadPoolExecutor调度原理

线程池的优雅关闭实践

SpringBoot中如何优雅的使用多线程

Java多线程看这一篇就足够了(吐血超详细总结)