प्रौद्योगिकी साझेदारी

वसन्त SimpleAsyncTaskExecutor शिक्षण

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. परिचयः

  1. SimpleAsyncTaskExecutor वास्तविकः थ्रेड् पूलः नास्ति प्रत्येकं आह्वानं नूतनं थ्रेड् निर्मास्यति यदा समवर्तीता बृहत् भवति तदा धागानां अधिकतमसंख्या न भविष्यति
  2. जावा-मध्ये थ्रेड्-निर्माणं सस्तो नास्ति, थ्रेड्-वस्तूनाम् बहु स्मृतिः गृह्णाति, बृहत्-अनुप्रयोगेषु च अनेक-थ्रेड्-वस्तूनाम् आवंटनं, वि-आवंटनं च बहु स्मृति-प्रबन्धन-ओवरहेड्-निर्माणं भवति
  3. प्रत्येकस्य वर्णस्य कृते नूतनं कार्यं आरभ्य अतुल्यकालिकरूपेण निष्पादयिष्यति;
  4. concurrencyLimit विशेषतायाः माध्यमेन समवर्तीसूत्राणां सीमितीकरणं समर्थयति, अर्थात् पूर्वनिर्धारितरूपेण, प्रवाहनियन्त्रणं न क्रियते, यस्य अर्थः अस्ति यत् समवर्तीसूत्राणां संख्या असीमितम् अस्ति;

यथा सम्प्रति अवगम्यते, Spring Kafka इत्यस्य KafkaListener इत्यनेन SimpleAsyncTaskExecutor इत्यस्य उपयोगः भवति, परन्तु अन्येषु परिदृश्येषु तस्य उपयोगः न भवति;

2. प्रयोगः

1. समवर्ती धारासीमीकरणं नास्ति

समवर्ती वर्तमानसीमीकरणं विना, प्रत्येकं समये SimpleAsyncTaskExecutor.execute(runnable) निष्पादितं भवति, कार्यं अतुल्यकालिकरूपेण निष्पादयितुं नूतनः थ्रेड् निर्मितः भविष्यति;

/**
 * 不带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

निम्नलिखितरूपेण मुद्रयन्तु : १.

当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2. समवर्ती धारासीमीकरणं कुर्वन्तु

समवर्ती वर्तमानसीमीकरणं कुर्वन्तु प्रत्येकं समये SimpleAsyncTaskExecutor.execute(runnable) निष्पादितं भवति, कार्याणि अतुल्यकालिकरूपेण निष्पादयितुं नूतनः थ्रेड् निर्मितः भविष्यति;

अतः समवर्तीधारासीमाकरणस्य असमवर्तीधारासीमीकरणस्य च मध्ये कः अन्तरः अस्ति ?

  • समवर्ती वर्तमानसीमा नास्ति: 10 थ्रेड्स समवर्तीरूपेण निष्पादयन्ति;
  • Concurrency Throttle: यदि concurrencyThrottle concurrent threads 3 इति सेट् भवति तर्हि केवलं 3 threads एकस्मिन् निश्चितसमये समवर्तीरूपेण निष्पादयितुं शक्नुवन्ति;
/**
 * 带并发限流控制的 SimpleAsyncTaskExecutor
 */
public static void main(String[] args) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");

    // 并发线程限制
    executor.setConcurrencyLimit(3);

    Runnable runnable = new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000L);
            System.out.println("当前线程: " + Thread.currentThread().getName());
        }
    };

    for (int i = 0; i < 10; i++) {
        executor.execute(runnable);
    }
    
    // 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
    System.out.println("主线程执行完成");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

निम्नलिखितरूपेण मुद्रयन्तु : १.

当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. स्रोतसङ्केतविश्लेषणम्

SimpleAsyncTaskExecutor इत्यस्य स्रोतसङ्केतः तुल्यकालिकरूपेण लघुः अस्ति, प्रत्यक्षतया एतत् वर्गं पश्यामः;

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
    implements AsyncListenableTaskExecutor, Serializable {

    /**
    * -1 表示不进行并发限流
    */
    public static final int UNBOUNDED_CONCURRENCY = -1;

    /**
    * 0 表示其他线程等待其他线程执行完
    */
    public static final int NO_CONCURRENCY = 0;


    // 并发限流的实现对象
    // 并发限流就是靠这个类实现的
    private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

    @Nullable
    private ThreadFactory threadFactory;

    @Nullable
    private TaskDecorator taskDecorator;

    public SimpleAsyncTaskExecutor() {
        super();
    }

    public SimpleAsyncTaskExecutor(String threadNamePrefix) {
        super(threadNamePrefix);
    }

    /**
    * 设置并发线程数
    * 给 concurrencyThrottle 的 concurrencyLimit 字段设值
    * 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
    */
    public void setConcurrencyLimit(int concurrencyLimit) {
        this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
    }

    /**
    * 当前是否是并发限流的
    * 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
    */
    public final boolean isThrottleActive() {
        return this.concurrencyThrottle.isThrottleActive();
    }


    /**
    * 常用的 execute()
    */
    @SuppressWarnings("deprecation")
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
    * 执行给定的 task
    */
    @Deprecated
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = (this.taskDecorator != null ? 
                              this.taskDecorator.decorate(task) : task);
        
        // 1. 如果需要进行并发限流,走下面的逻辑
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
        }
        else {
            // 2. 不需要并发限流,直接执行 doExecute(task)
            doExecute(taskToUse);
        }
    }


    protected void doExecute(Runnable task) {
        // 1. 直接创建一个新的线程!!!
        // 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
        Thread thread = (this.threadFactory != null ? 
                         this.threadFactory.newThread(task) : createThread(task));
        
        // 2. 调用 thread.start() 异步执行任务
        thread.start();
    }


    /**
    * ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
    */
    private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

        @Override
        protected void beforeAccess() {
            super.beforeAccess();
        }

        @Override
        protected void afterAccess() {
            super.afterAccess();
        }
    }


    /**
    * 包装了 Runnable 对象,并且本身也是 Runnable 对象
    * 装饰器模式
    */
    private class ConcurrencyThrottlingRunnable implements Runnable {

        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                // 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
                concurrencyThrottle.afterAccess();
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

उपर्युक्तवर्णनस्य माध्यमेन अस्माकं SimpleAsyncTaskExecutor इत्यस्य सरलबोधः अस्ति;

समवर्तीप्रवाहनियन्त्रणं विना, प्रत्येकं SimpleAsyncTaskExecutor.execute() निष्पादने नूतनः थ्रेड् निर्मितः भविष्यति इति अवगन्तुं सुलभम्;

वयं मुख्यतया पश्यामः यत् समवर्तीप्रवाहनियन्त्रणं क्रियमाणे सति प्रवाहनियन्त्रणं कथं करोति;

// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
    Runnable taskToUse = (this.taskDecorator != null ? 
                          this.taskDecorator.decorate(task) : task);
    
    // 1. 如果需要进行并发限流,走下面的逻辑
    if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
        
        // 1.1 执行 this.concurrencyThrottle.beforeAccess()
        // 如果被并发限流的话会阻塞等待
        this.concurrencyThrottle.beforeAccess();
        
        // 1.2 此时没有被限流住
        // 将 task 包装为 ConcurrencyThrottlingRunnable
        // ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
        // 使其他线程能通过限流
        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
    }
    else {
        // 2. 不需要并发限流,直接执行 doExecute(task)
        doExecute(taskToUse);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

this.concurrencyThrottle.beforeAccess() तथा ConcurrencyThrottlingRunnable.run() इत्येतौ द्वौ अपि मुख्यबिन्दवौ स्तः अधः पृथक् पृथक् विश्लेषणं कुर्मः;

1. concurrencyThrottle.beforeAccess () 1.1.

भवान् द्रष्टुं शक्नोति यत् एतत् synchronized तथा concurrencyLimit इत्येतयोः माध्यमेन समवर्तीसीमाम् नियन्त्रयति;

// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
    if (this.concurrencyLimit == 0) {
        // 不允许 concurrencyLimit == 0
        throw new IllegalStateException();
    }
    
    // 1. 存在并发限流的场景,this.concurrencyLimit > 0
    if (this.concurrencyLimit > 0) {
        
        // 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
        synchronized (this.monitor) {
            
            // 3. 如果当前并发线程 >= this.concurrencyLimit
            // 当前线程 wait 等待,直到其他线程唤醒它
            while (this.concurrencyCount >= this.concurrencyLimit) {
                this.monitor.wait();
            }
            
            // 4. 当前并发线程数 concurrencyCount++
            this.concurrencyCount++;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2. समवर्तीगलाघातRunnable

अस्य वर्गस्य run() इति अवलोकयामः;

// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {

    private final Runnable target;

    public ConcurrencyThrottlingRunnable(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        try {
            // 1. 执行目标 target.run()
            this.target.run();
        }
        finally {
            
            // 2. 执行 concurrencyThrottle.afterAccess()
            concurrencyThrottle.afterAccess();
        }
    }
}



// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
    // 并发限流场景下
    // 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
    if (this.concurrencyLimit >= 0) {
        synchronized (this.monitor) {
            this.concurrencyCount--;
            this.monitor.notify();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

अस्मिन् क्षणे SimpleAsyncTaskExecutor इत्यस्य विश्लेषणं सम्पन्नं भवति;

वास्तविकविकासे अस्माभिः विनाशकारीसमस्यानां परिहाराय SimpleAsyncTaskExecutor इत्यस्य उपयोगः न कर्तव्यः;