Κοινή χρήση τεχνολογίας

Εκμάθηση Spring SimpleAsyncTaskExecutor

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Εισαγωγή

  1. Το SimpleAsyncTaskExecutor δεν είναι ένα πραγματικό νήμα.
  2. Η δημιουργία νημάτων σε Java δεν είναι φθηνή, τα αντικείμενα νήματος καταλαμβάνουν πολλή μνήμη και σε μεγάλες εφαρμογές, η κατανομή και η εκχώρηση πολλών αντικειμένων νημάτων δημιουργεί μεγάλο κόστος διαχείρισης μνήμης.
  3. Θα ξεκινήσει μια νέα εργασία για κάθε χαρακτήρα και θα την εκτελέσει ασύγχρονα.
  4. Υποστηρίζει τον περιορισμό ταυτόχρονων νημάτων μέσω του χαρακτηριστικού concurrencyLimit, δηλαδή, έλεγχος ροής από προεπιλογή, δεν θα εκτελείται έλεγχος ροής, πράγμα που σημαίνει ότι ο αριθμός των ταυτόχρονων νημάτων είναι απεριόριστος.

Όπως είναι κατανοητό επί του παρόντος, το KafkaListener του Spring Kafka χρησιμοποιεί SimpleAsyncTaskExecutor, αλλά δεν χρησιμοποιείται σε άλλα σενάρια.

2. Χρήση

1. Χωρίς περιορισμό ταυτόχρονου ρεύματος

Χωρίς ταυτόχρονο περιορισμό ρεύματος, κάθε φορά που εκτελείται το SimpleAsyncTaskExecutor.execute(runnable), θα δημιουργείται ένα νέο νήμα για την ασύγχρονη εκτέλεση της εργασίας.

/**
 * 不带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Εκτυπώστε ως εξής:

当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2. Εκτελέστε ταυτόχρονο περιορισμό ρεύματος

Εκτελέστε ταυτόχρονο περιορισμό ρεύματος Κάθε φορά που εκτελείται το SimpleAsyncTaskExecute(runnable), θα δημιουργείται ένα νέο νήμα για την ασύγχρονη εκτέλεση εργασιών.

Ποια είναι λοιπόν η διαφορά μεταξύ περιορισμού ταυτόχρονου ρεύματος και περιορισμού μη ταυτόχρονου ρεύματος;

  • Χωρίς όριο ταυτόχρονου ρεύματος: 10 νήματα εκτελούνται ταυτόχρονα.
  • Concurrency Throttle: Εάν το concurrencyThrottle concurrent threads έχει οριστεί σε 3, μόνο 3 threads μπορούν να εκτελεστούν ταυτόχρονα σε μια συγκεκριμένη χρονική στιγμή.
/**
 * 带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    // 并发线程限制
    executor.setConcurrencyLimit(3);

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
    
    // 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
    System.out.println("主线程执行完成");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

Εκτυπώστε ως εξής:

当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Ανάλυση πηγαίου κώδικα

Ο πηγαίος κώδικας του SimpleAsyncTaskExecutor είναι σχετικά μικρός, ας δούμε απευθείας αυτήν την κλάση.

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {

    /**
    * -1 表示不进行并发限流
    */
    public static final int UNBOUNDED_CONCURRENCY = -1;

    /**
    * 0 表示其他线程等待其他线程执行完
    */
    public static final int NO_CONCURRENCY = 0;


    // 并发限流的实现对象
    // 并发限流就是靠这个类实现的
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

    @Nullable
    private ThreadFactory threadFactory;

    @Nullable
    private TaskDecorator taskDecorator;

    public SimpleAsyncTaskExecutor() {
        super();
    }

    public SimpleAsyncTaskExecutor(String threadNamePrefix) {
        super(threadNamePrefix);
    }

    /**
    * 设置并发线程数
    * 给 concurrencyThrottle 的 concurrencyLimit 字段设值
    * 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
    */
    public void setConcurrencyLimit(int concurrencyLimit) {
        this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }

    /**
    * 当前是否是并发限流的
    * 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
    */
    public final boolean isThrottleActive() {
        return this.concurrencyThrottle.isThrottleActive();
    }


    /**
    * 常用的 execute()
    */
    @SuppressWarnings("deprecation")
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
    * 执行给定的 task
    */
    @Deprecated
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = (this.taskDecorator != null ? 
                              this.taskDecorator.decorate(task) : task);
        
        // 1. 如果需要进行并发限流,走下面的逻辑
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
        }
        else {
            // 2. 不需要并发限流,直接执行 doExecute(task)
            doExecute(taskToUse);
        }
    }


    protected void doExecute(Runnable task) {
        // 1. 直接创建一个新的线程!!!
        // 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
        Thread thread = (this.threadFactory != null ? 
                         this.threadFactory.newThread(task) : createThread(task));
        
        // 2. 调用 thread.start() 异步执行任务
        thread.start();
    }


    /**
    * ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
    */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

        @Override
        protected void beforeAccess() {
            super.beforeAccess();
        }

        @Override
        protected void afterAccess() {
            super.afterAccess();
        }
    }


    /**
    * 包装了 Runnable 对象,并且本身也是 Runnable 对象
    * 装饰器模式
    */
    private class ConcurrencyThrottlingRunnable implements Runnable {

        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                // 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
                concurrencyThrottle.afterAccess();
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

Μέσα από την παραπάνω περιγραφή, έχουμε μια απλή κατανόηση του SimpleAsyncTaskExecutor.

Χωρίς έλεγχο ταυτόχρονης ροής, είναι εύκολο να γίνει κατανοητό ότι ένα νέο νήμα θα δημιουργείται κάθε φορά που εκτελείται η SimpleAsyncTaskExecutor.execute().

Εξετάζουμε κυρίως πώς εκτελεί τον έλεγχο ροής όταν εκτελείται ταυτόχρονος έλεγχος ροής.

// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
    Runnable taskToUse = (this.taskDecorator != null ? 
                          this.taskDecorator.decorate(task) : task);
    
    // 1. 如果需要进行并发限流,走下面的逻辑
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        
        // 1.1 执行 this.concurrencyThrottle.beforeAccess()
        // 如果被并发限流的话会阻塞等待
        this.concurrencyThrottle.beforeAccess();
        
        // 1.2 此时没有被限流住
        // 将 task 包装为 ConcurrencyThrottlingRunnable
        // ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
        // 使其他线程能通过限流
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    }
    else {
        // 2. 不需要并发限流,直接执行 doExecute(task)
        doExecute(taskToUse);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Το this.concurrencyThrottle.beforeAccess() και το ConcurrencyThrottlingRunnable.run() είναι και τα δύο βασικά σημεία.

1. concurrencyThrottle.beforeAccess()

Μπορείτε να δείτε ότι ελέγχει το όριο συγχρονισμού μέσω συγχρονισμένου και concurrencyLimit.

// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
    if (this.concurrencyLimit == 0) {
        // 不允许 concurrencyLimit == 0
        throw new IllegalStateException();
    }
    
    // 1. 存在并发限流的场景,this.concurrencyLimit > 0
    if (this.concurrencyLimit > 0) {
        
        // 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
        synchronized (this.monitor) {
            
            // 3. 如果当前并发线程 >= this.concurrencyLimit
            // 当前线程 wait 等待,直到其他线程唤醒它
            while (this.concurrencyCount >= this.concurrencyLimit) {
                this.monitor.wait();
            }
            
            // 4. 当前并发线程数 concurrencyCount++
            this.concurrencyCount++;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2. ConcurrencyThrottlingRunnable

Ας ρίξουμε μια ματιά στο run() αυτής της κλάσης.

// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {

    private final Runnable target;

    public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        try {
            // 1. 执行目标 target.run()
            this.target.run();
        }
        finally {
            
            // 2. 执行 concurrencyThrottle.afterAccess()
            concurrencyThrottle.afterAccess();
        }
    }
}



// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
    // 并发限流场景下
    // 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
    if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
            this.concurrencyCount--;
            this.monitor.notify();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

Σε αυτό το σημείο, ολοκληρώνεται η ανάλυση του SimpleAsyncTaskExecutor.

Στην πραγματική ανάπτυξη, δεν πρέπει να χρησιμοποιούμε το SimpleAsyncTaskExecutor για να αποφύγουμε καταστροφικά προβλήματα.