Partage de technologie

Apprentissage Spring SimpleAsyncTaskExecutor

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Introduction

  1. SimpleAsyncTaskExecutor n'est pas un véritable pool de threads. Cette classe ne réutilise pas les threads. Chaque appel créera un nouveau thread. Il n'y a pas de nombre maximum de threads défini ; de sérieux problèmes de performances se produiront lorsque la concurrence est importante ;
  2. La création de threads en Java n'est pas bon marché, les objets thread occupent beaucoup de mémoire et dans les grandes applications, l'allocation et la désallocation de nombreux objets thread créent une surcharge de gestion de la mémoire ;
  3. Il démarrera une nouvelle tâche pour chaque personnage et l'exécutera de manière asynchrone ;
  4. Prend en charge la limitation des threads simultanés via l'attribut concurrencyLimit, c'est-à-dire le contrôle de flux ; par défaut, le contrôle de flux ne sera pas effectué, ce qui signifie que le nombre de threads simultanés est illimité ;

Tel qu'il est actuellement compris, KafkaListener de Spring Kafka utilise SimpleAsyncTaskExecutor, mais il n'est pas utilisé dans d'autres scénarios ;

2. Utiliser

1. Aucune limitation de courant simultanée

Sans limitation de courant simultanée, chaque fois que SimpleAsyncTaskExecutor.execute(runnable) est exécuté, un nouveau thread sera créé pour exécuter la tâche de manière asynchrone ;

/**
 * 不带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Imprimez comme suit :

当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2. Effectuer une limitation de courant simultanée

Effectuer une limitation de courant simultanée Chaque fois que SimpleAsyncTaskExecutor.execute(runnable) est exécuté, un nouveau thread sera créé pour exécuter les tâches de manière asynchrone ;

Alors, quelle est la différence entre la limitation de courant simultanée et la limitation de courant non simultanée ?

  • Aucune limite de courant simultané : 10 threads s'exécutent simultanément ;
  • Concurrency Throttle : si les threads simultanés ConcurrencyThrottle sont définis sur 3, seuls 3 threads peuvent s'exécuter simultanément à un certain moment ;
/**
 * 带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    // 并发线程限制
    executor.setConcurrencyLimit(3);

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
    
    // 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
    System.out.println("主线程执行完成");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

Imprimez comme suit :

当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Analyse du code source

Le code source de SimpleAsyncTaskExecutor est relativement petit, regardons directement cette classe ;

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {

    /**
    * -1 表示不进行并发限流
    */
    public static final int UNBOUNDED_CONCURRENCY = -1;

    /**
    * 0 表示其他线程等待其他线程执行完
    */
    public static final int NO_CONCURRENCY = 0;


    // 并发限流的实现对象
    // 并发限流就是靠这个类实现的
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

    @Nullable
    private ThreadFactory threadFactory;

    @Nullable
    private TaskDecorator taskDecorator;

    public SimpleAsyncTaskExecutor() {
        super();
    }

    public SimpleAsyncTaskExecutor(String threadNamePrefix) {
        super(threadNamePrefix);
    }

    /**
    * 设置并发线程数
    * 给 concurrencyThrottle 的 concurrencyLimit 字段设值
    * 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
    */
    public void setConcurrencyLimit(int concurrencyLimit) {
        this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }

    /**
    * 当前是否是并发限流的
    * 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
    */
    public final boolean isThrottleActive() {
        return this.concurrencyThrottle.isThrottleActive();
    }


    /**
    * 常用的 execute()
    */
    @SuppressWarnings("deprecation")
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
    * 执行给定的 task
    */
    @Deprecated
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = (this.taskDecorator != null ? 
                              this.taskDecorator.decorate(task) : task);
        
        // 1. 如果需要进行并发限流,走下面的逻辑
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
        }
        else {
            // 2. 不需要并发限流,直接执行 doExecute(task)
            doExecute(taskToUse);
        }
    }


    protected void doExecute(Runnable task) {
        // 1. 直接创建一个新的线程!!!
        // 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
        Thread thread = (this.threadFactory != null ? 
                         this.threadFactory.newThread(task) : createThread(task));
        
        // 2. 调用 thread.start() 异步执行任务
        thread.start();
    }


    /**
    * ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
    */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

        @Override
        protected void beforeAccess() {
            super.beforeAccess();
        }

        @Override
        protected void afterAccess() {
            super.afterAccess();
        }
    }


    /**
    * 包装了 Runnable 对象,并且本身也是 Runnable 对象
    * 装饰器模式
    */
    private class ConcurrencyThrottlingRunnable implements Runnable {

        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                // 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
                concurrencyThrottle.afterAccess();
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

Grâce à la description ci-dessus, nous avons une compréhension simple de SimpleAsyncTaskExecutor ;

Sans contrôle de flux simultané, il est facile de comprendre qu'un nouveau thread sera créé à chaque fois que SimpleAsyncTaskExecutor.execute() est exécuté ;

Nous examinons principalement comment il effectue le contrôle de flux lorsqu'un contrôle de flux simultané est effectué ;

// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
    Runnable taskToUse = (this.taskDecorator != null ? 
                          this.taskDecorator.decorate(task) : task);
    
    // 1. 如果需要进行并发限流,走下面的逻辑
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        
        // 1.1 执行 this.concurrencyThrottle.beforeAccess()
        // 如果被并发限流的话会阻塞等待
        this.concurrencyThrottle.beforeAccess();
        
        // 1.2 此时没有被限流住
        // 将 task 包装为 ConcurrencyThrottlingRunnable
        // ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
        // 使其他线程能通过限流
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    }
    else {
        // 2. 不需要并发限流,直接执行 doExecute(task)
        doExecute(taskToUse);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

this.concurrencyThrottle.beforeAccess() et ConcurrencyThrottlingRunnable.run() sont deux points clés. Analysons-les séparément ci-dessous ;

1.concurrencyThrottle.beforeAccess()

Vous pouvez voir qu'il contrôle la limite de concurrence via synchronisé et concurrencyLimit ;

// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
    if (this.concurrencyLimit == 0) {
        // 不允许 concurrencyLimit == 0
        throw new IllegalStateException();
    }
    
    // 1. 存在并发限流的场景,this.concurrencyLimit > 0
    if (this.concurrencyLimit > 0) {
        
        // 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
        synchronized (this.monitor) {
            
            // 3. 如果当前并发线程 >= this.concurrencyLimit
            // 当前线程 wait 等待,直到其他线程唤醒它
            while (this.concurrencyCount >= this.concurrencyLimit) {
                this.monitor.wait();
            }
            
            // 4. 当前并发线程数 concurrencyCount++
            this.concurrencyCount++;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2. ConcurrencyThrottlingRunnable

Jetons un coup d'œil à run() de cette classe ;

// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {

    private final Runnable target;

    public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        try {
            // 1. 执行目标 target.run()
            this.target.run();
        }
        finally {
            
            // 2. 执行 concurrencyThrottle.afterAccess()
            concurrencyThrottle.afterAccess();
        }
    }
}



// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
    // 并发限流场景下
    // 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
    if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
            this.concurrencyCount--;
            this.monitor.notify();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

À ce stade, l’analyse de SimpleAsyncTaskExecutor est terminée ;

Dans le développement réel, nous ne devrions pas utiliser SimpleAsyncTaskExecutor pour éviter des problèmes catastrophiques ;