기술나눔

Spring SimpleAsyncTaskExecutor 학습

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. 소개

  1. SimpleAsyncTaskExecutor는 실제 스레드 풀이 아닙니다. 각 호출은 새 스레드를 생성합니다. 동시성이 클 경우 심각한 성능 문제가 발생합니다.
  2. Java에서 스레드를 생성하는 것은 저렴하지 않으며 스레드 개체는 많은 메모리를 차지하며 대규모 응용 프로그램에서는 많은 스레드 개체를 할당하고 할당 해제하면 많은 메모리 관리 오버헤드가 발생합니다.
  3. 각 캐릭터에 대해 새 작업을 시작하고 비동기적으로 실행합니다.
  4. concurrencyLimit 속성을 통해 동시 스레드 제한을 지원합니다. 즉, 기본적으로 흐름 제어가 수행되지 않습니다. 즉, 동시 스레드 수가 무제한입니다.

현재 이해되는 바와 같이 Spring Kafka의 KafkaListener는 SimpleAsyncTaskExecutor를 사용하지만 다른 시나리오에서는 사용되지 않습니다.

2. 사용

1. 동시 전류 제한 없음

동시 전류 제한이 없으면 SimpleAsyncTaskExecutor.execute(runnable)가 실행될 때마다 작업을 비동기적으로 실행하기 위해 새 스레드가 생성됩니다.

/**
 * 不带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

다음과 같이 인쇄하십시오:

当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2. 동시 전류 제한 수행

동시 전류 제한을 수행합니다. SimpleAsyncTaskExecutor.execute(runnable)가 실행될 때마다 작업을 비동기적으로 실행하기 위해 새 스레드가 생성됩니다.

그렇다면 동시 전류 제한과 비동시 전류 제한의 차이점은 무엇입니까?

  • 동시 전류 제한 없음: 10개의 스레드가 동시에 실행됩니다.
  • Concurrency Throttle: concurrencyThrottle 동시 스레드가 3으로 설정된 경우 특정 시간에 3개의 스레드만 동시에 실행할 수 있습니다.
/**
 * 带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    // 并发线程限制
    executor.setConcurrencyLimit(3);

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
    
    // 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
    System.out.println("主线程执行完成");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

다음과 같이 인쇄하십시오:

当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. 소스코드 분석

SimpleAsyncTaskExecutor의 소스 코드는 상대적으로 작습니다. 이 클래스를 직접 살펴보겠습니다.

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {

    /**
    * -1 表示不进行并发限流
    */
    public static final int UNBOUNDED_CONCURRENCY = -1;

    /**
    * 0 表示其他线程等待其他线程执行完
    */
    public static final int NO_CONCURRENCY = 0;


    // 并发限流的实现对象
    // 并发限流就是靠这个类实现的
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

    @Nullable
    private ThreadFactory threadFactory;

    @Nullable
    private TaskDecorator taskDecorator;

    public SimpleAsyncTaskExecutor() {
        super();
    }

    public SimpleAsyncTaskExecutor(String threadNamePrefix) {
        super(threadNamePrefix);
    }

    /**
    * 设置并发线程数
    * 给 concurrencyThrottle 的 concurrencyLimit 字段设值
    * 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
    */
    public void setConcurrencyLimit(int concurrencyLimit) {
        this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }

    /**
    * 当前是否是并发限流的
    * 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
    */
    public final boolean isThrottleActive() {
        return this.concurrencyThrottle.isThrottleActive();
    }


    /**
    * 常用的 execute()
    */
    @SuppressWarnings("deprecation")
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
    * 执行给定的 task
    */
    @Deprecated
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = (this.taskDecorator != null ? 
                              this.taskDecorator.decorate(task) : task);
        
        // 1. 如果需要进行并发限流,走下面的逻辑
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
        }
        else {
            // 2. 不需要并发限流,直接执行 doExecute(task)
            doExecute(taskToUse);
        }
    }


    protected void doExecute(Runnable task) {
        // 1. 直接创建一个新的线程!!!
        // 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
        Thread thread = (this.threadFactory != null ? 
                         this.threadFactory.newThread(task) : createThread(task));
        
        // 2. 调用 thread.start() 异步执行任务
        thread.start();
    }


    /**
    * ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
    */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

        @Override
        protected void beforeAccess() {
            super.beforeAccess();
        }

        @Override
        protected void afterAccess() {
            super.afterAccess();
        }
    }


    /**
    * 包装了 Runnable 对象,并且本身也是 Runnable 对象
    * 装饰器模式
    */
    private class ConcurrencyThrottlingRunnable implements Runnable {

        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                // 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
                concurrencyThrottle.afterAccess();
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

위의 설명을 통해 SimpleAsyncTaskExecutor에 대해 간단하게 이해했습니다.

동시 흐름 제어가 없으면 SimpleAsyncTaskExecutor.execute()가 실행될 때마다 새 스레드가 생성된다는 것을 쉽게 이해할 수 있습니다.

동시 흐름 제어를 수행할 때 흐름 제어를 어떻게 수행하는지 주로 살펴봅니다.

// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
    Runnable taskToUse = (this.taskDecorator != null ? 
                          this.taskDecorator.decorate(task) : task);
    
    // 1. 如果需要进行并发限流,走下面的逻辑
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        
        // 1.1 执行 this.concurrencyThrottle.beforeAccess()
        // 如果被并发限流的话会阻塞等待
        this.concurrencyThrottle.beforeAccess();
        
        // 1.2 此时没有被限流住
        // 将 task 包装为 ConcurrencyThrottlingRunnable
        // ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
        // 使其他线程能通过限流
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    }
    else {
        // 2. 不需要并发限流,直接执行 doExecute(task)
        doExecute(taskToUse);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

this.concurrencyThrottle.beforeAccess() 및 ConcurrencyThrottlingRunnable.run()은 모두 아래에서 별도로 분석해 보겠습니다.

1. 동시성Throttle.beforeAccess()

동기화 및 concurrencyLimit를 통해 동시성 제한을 제어하는 ​​것을 볼 수 있습니다.

// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
    if (this.concurrencyLimit == 0) {
        // 不允许 concurrencyLimit == 0
        throw new IllegalStateException();
    }
    
    // 1. 存在并发限流的场景,this.concurrencyLimit > 0
    if (this.concurrencyLimit > 0) {
        
        // 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
        synchronized (this.monitor) {
            
            // 3. 如果当前并发线程 >= this.concurrencyLimit
            // 当前线程 wait 等待,直到其他线程唤醒它
            while (this.concurrencyCount >= this.concurrencyLimit) {
                this.monitor.wait();
            }
            
            // 4. 当前并发线程数 concurrencyCount++
            this.concurrencyCount++;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2. 동시성 조절 실행 가능

이 클래스의 run()을 살펴보겠습니다.

// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {

    private final Runnable target;

    public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        try {
            // 1. 执行目标 target.run()
            this.target.run();
        }
        finally {
            
            // 2. 执行 concurrencyThrottle.afterAccess()
            concurrencyThrottle.afterAccess();
        }
    }
}



// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
    // 并发限流场景下
    // 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
    if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
            this.concurrencyCount--;
            this.monitor.notify();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

이 시점에서 SimpleAsyncTaskExecutor 분석이 완료됩니다.

실제 개발에서는 심각한 문제를 피하기 위해 SimpleAsyncTaskExecutor를 사용해서는 안 됩니다.