Обмен технологиями

Обучение Spring SimpleAsyncTaskExecutor

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Введение

  1. SimpleAsyncTaskExecutor не является настоящим пулом потоков. Каждый вызов создает новый поток. Максимальное количество потоков не установлено; при большом параллелизме могут возникнуть серьезные проблемы с производительностью.
  2. Создание потоков в Java обходится недешево, объекты потоков занимают много памяти, а в больших приложениях выделение и освобождение множества объектов потоков создает большие накладные расходы на управление памятью;
  3. Он запустит новую задачу для каждого персонажа и выполнит ее асинхронно;
  4. Поддерживает ограничение одновременных потоков через атрибут concurrencyLimit, то есть управление потоком по умолчанию выполняться не будет, а это значит, что количество одновременных потоков не ограничено;

Как сейчас понятно, KafkaListener Spring Kafka использует SimpleAsyncTaskExecutor, но он не используется в других сценариях;

2. Используйте

1. Нет одновременного ограничения тока

Без ограничения одновременного тока при каждом выполнении SimpleAsyncTaskExecutor.execute(runnable) будет создаваться новый поток для асинхронного выполнения задачи;

/**
 * 不带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Распечатайте следующим образом:

当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2. Выполните одновременное ограничение тока.

Выполнять одновременное ограничение тока. Каждый раз, когда выполняется SimpleAsyncTaskExecutor.execute(runnable), создается новый поток для асинхронного выполнения задач.

Так в чем же разница между одновременным ограничением тока и неодновременным ограничением тока?

  • Нет ограничения по одновременному току: одновременно выполняются 10 потоков;
  • Concurrency Throttle: если для concurrencyThrottle concurrent threads установлено значение 3, только 3 потока могут выполняться одновременно в определенное время;
/**
 * 带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    // 并发线程限制
    executor.setConcurrencyLimit(3);

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
    
    // 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
    System.out.println("主线程执行完成");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

Распечатайте следующим образом:

当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Анализ исходного кода

Исходный код SimpleAsyncTaskExecutor относительно небольшой, давайте посмотрим непосредственно на этот класс;

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {

    /**
    * -1 表示不进行并发限流
    */
    public static final int UNBOUNDED_CONCURRENCY = -1;

    /**
    * 0 表示其他线程等待其他线程执行完
    */
    public static final int NO_CONCURRENCY = 0;


    // 并发限流的实现对象
    // 并发限流就是靠这个类实现的
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

    @Nullable
    private ThreadFactory threadFactory;

    @Nullable
    private TaskDecorator taskDecorator;

    public SimpleAsyncTaskExecutor() {
        super();
    }

    public SimpleAsyncTaskExecutor(String threadNamePrefix) {
        super(threadNamePrefix);
    }

    /**
    * 设置并发线程数
    * 给 concurrencyThrottle 的 concurrencyLimit 字段设值
    * 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
    */
    public void setConcurrencyLimit(int concurrencyLimit) {
        this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }

    /**
    * 当前是否是并发限流的
    * 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
    */
    public final boolean isThrottleActive() {
        return this.concurrencyThrottle.isThrottleActive();
    }


    /**
    * 常用的 execute()
    */
    @SuppressWarnings("deprecation")
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
    * 执行给定的 task
    */
    @Deprecated
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = (this.taskDecorator != null ? 
                              this.taskDecorator.decorate(task) : task);
        
        // 1. 如果需要进行并发限流,走下面的逻辑
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
        }
        else {
            // 2. 不需要并发限流,直接执行 doExecute(task)
            doExecute(taskToUse);
        }
    }


    protected void doExecute(Runnable task) {
        // 1. 直接创建一个新的线程!!!
        // 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
        Thread thread = (this.threadFactory != null ? 
                         this.threadFactory.newThread(task) : createThread(task));
        
        // 2. 调用 thread.start() 异步执行任务
        thread.start();
    }


    /**
    * ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
    */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

        @Override
        protected void beforeAccess() {
            super.beforeAccess();
        }

        @Override
        protected void afterAccess() {
            super.afterAccess();
        }
    }


    /**
    * 包装了 Runnable 对象,并且本身也是 Runnable 对象
    * 装饰器模式
    */
    private class ConcurrencyThrottlingRunnable implements Runnable {

        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                // 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
                concurrencyThrottle.afterAccess();
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

Благодаря приведенному выше описанию у нас есть простое понимание SimpleAsyncTaskExecutor;

Без управления параллельным потоком легко понять, что новый поток будет создаваться каждый раз, когда выполняется SimpleAsyncTaskExecutor.execute();

В основном мы смотрим на то, как он осуществляет управление потоком, когда выполняется одновременное управление потоком;

// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
    Runnable taskToUse = (this.taskDecorator != null ? 
                          this.taskDecorator.decorate(task) : task);
    
    // 1. 如果需要进行并发限流,走下面的逻辑
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        
        // 1.1 执行 this.concurrencyThrottle.beforeAccess()
        // 如果被并发限流的话会阻塞等待
        this.concurrencyThrottle.beforeAccess();
        
        // 1.2 此时没有被限流住
        // 将 task 包装为 ConcurrencyThrottlingRunnable
        // ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
        // 使其他线程能通过限流
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    }
    else {
        // 2. 不需要并发限流,直接执行 doExecute(task)
        doExecute(taskToUse);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

this.concurrencyThrottle.beforeAccess() и ConcurrencyThrottlingRunnable.run() являются ключевыми моментами. Давайте проанализируем их отдельно ниже;

1. concurrencyThrottle.beforeAccess()

Вы можете видеть, что он контролирует лимит параллелизма с помощью Synchronized и concurrencyLimit;

// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
    if (this.concurrencyLimit == 0) {
        // 不允许 concurrencyLimit == 0
        throw new IllegalStateException();
    }
    
    // 1. 存在并发限流的场景,this.concurrencyLimit > 0
    if (this.concurrencyLimit > 0) {
        
        // 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
        synchronized (this.monitor) {
            
            // 3. 如果当前并发线程 >= this.concurrencyLimit
            // 当前线程 wait 等待,直到其他线程唤醒它
            while (this.concurrencyCount >= this.concurrencyLimit) {
                this.monitor.wait();
            }
            
            // 4. 当前并发线程数 concurrencyCount++
            this.concurrencyCount++;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2. ConcurrencyThrottlingRunnable

Давайте посмотрим на run() этого класса;

// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {

    private final Runnable target;

    public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        try {
            // 1. 执行目标 target.run()
            this.target.run();
        }
        finally {
            
            // 2. 执行 concurrencyThrottle.afterAccess()
            concurrencyThrottle.afterAccess();
        }
    }
}



// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
    // 并发限流场景下
    // 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
    if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
            this.concurrencyCount--;
            this.monitor.notify();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

На этом анализ SimpleAsyncTaskExecutor завершен;

В реальной разработке нам не следует использовать SimpleAsyncTaskExecutor, чтобы избежать катастрофических проблем;