2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
As far as I know, Spring Kafka's KafkaListener uses SimpleAsyncTaskExecutor, but it has not been used in other scenarios.
No concurrent current limiting is performed. Each time SimpleAsyncTaskExecutor.execute(runnable) is executed, a new thread will be created to execute the task asynchronously.
/**
* 不带并发限流控制的 SimpleAsyncTaskExecutor
*/
public static void main(String[] args) {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");
Runnable runnable = new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(1000L);
System.out.println("当前线程: " + Thread.currentThread().getName());
}
};
for (int i = 0; i < 10; i++) {
executor.execute(runnable);
}
}
Prints as follows:
当前线程: my-test-4
当前线程: my-test-10
当前线程: my-test-5
当前线程: my-test-3
当前线程: my-test-9
当前线程: my-test-7
当前线程: my-test-2
当前线程: my-test-6
当前线程: my-test-8
当前线程: my-test-1
Concurrency limiting is performed. Each time SimpleAsyncTaskExecutor.execute(runnable) is executed, a new thread will be created to execute the task asynchronously.
So what is the difference between concurrent current limiting and non-concurrent current limiting?
/**
* 带并发限流控制的 SimpleAsyncTaskExecutor
*/
public static void main(String[] args) {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("my-test-");
// 并发线程限制
executor.setConcurrencyLimit(3);
Runnable runnable = new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(1000L);
System.out.println("当前线程: " + Thread.currentThread().getName());
}
};
for (int i = 0; i < 10; i++) {
executor.execute(runnable);
}
// 会发现主线程被卡住,因为在 SimpleAsyncTaskExecutor 中会阻塞等待
System.out.println("主线程执行完成");
}
Prints as follows:
当前线程: my-test-3
当前线程: my-test-1
当前线程: my-test-2
------------------------------------------
当前线程: my-test-6
当前线程: my-test-5
当前线程: my-test-4
------------------------------------------
当前线程: my-test-8
当前线程: my-test-7
当前线程: my-test-9
------------------------------------------
主线程执行完成
当前线程: my-test-10
The source code of SimpleAsyncTaskExecutor is relatively small, so let's look at this class directly;
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
/**
* -1 表示不进行并发限流
*/
public static final int UNBOUNDED_CONCURRENCY = -1;
/**
* 0 表示其他线程等待其他线程执行完
*/
public static final int NO_CONCURRENCY = 0;
// 并发限流的实现对象
// 并发限流就是靠这个类实现的
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
@Nullable
private ThreadFactory threadFactory;
@Nullable
private TaskDecorator taskDecorator;
public SimpleAsyncTaskExecutor() {
super();
}
public SimpleAsyncTaskExecutor(String threadNamePrefix) {
super(threadNamePrefix);
}
/**
* 设置并发线程数
* 给 concurrencyThrottle 的 concurrencyLimit 字段设值
* 默认 concurrencyThrottle 的 concurrencyLimit 的值为 -1,表示不进行并发限流
*/
public void setConcurrencyLimit(int concurrencyLimit) {
this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
}
/**
* 当前是否是并发限流的
* 其实就是看 concurrencyThrottle 的 concurrencyLimit >= 0 ?
*/
public final boolean isThrottleActive() {
return this.concurrencyThrottle.isThrottleActive();
}
/**
* 常用的 execute()
*/
@SuppressWarnings("deprecation")
@Override
public void execute(Runnable task) {
execute(task, TIMEOUT_INDEFINITE);
}
/**
* 执行给定的 task
*/
@Deprecated
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ?
this.taskDecorator.decorate(task) : task);
// 1. 如果需要进行并发限流,走下面的逻辑
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
// 2. 不需要并发限流,直接执行 doExecute(task)
doExecute(taskToUse);
}
}
protected void doExecute(Runnable task) {
// 1. 直接创建一个新的线程!!!
// 这就是为什么 SimpleAsyncTaskExecutor 每次执行 execute() 都会创建一个新线程的原因
Thread thread = (this.threadFactory != null ?
this.threadFactory.newThread(task) : createThread(task));
// 2. 调用 thread.start() 异步执行任务
thread.start();
}
/**
* ConcurrencyThrottleAdapter 只有两个方法,都是由父类实现的
*/
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
@Override
protected void beforeAccess() {
super.beforeAccess();
}
@Override
protected void afterAccess() {
super.afterAccess();
}
}
/**
* 包装了 Runnable 对象,并且本身也是 Runnable 对象
* 装饰器模式
*/
private class ConcurrencyThrottlingRunnable implements Runnable {
private final Runnable target;
public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
}
@Override
public void run() {
try {
this.target.run();
}
finally {
// 主要是在 finally 块中执行 concurrencyThrottle.afterAccess()
concurrencyThrottle.afterAccess();
}
}
}
}
Through the above description, we have a simple understanding of SimpleAsyncTaskExecutor;
Without concurrent flow control, it is easy to understand that each execution of SimpleAsyncTaskExecutor.execute() will create a new thread;
We mainly look at how it performs flow control when concurrent flow control is performed;
// ---------------------- SimpleAsyncTaskExecutor ------------------------
public void execute(Runnable task, long startTimeout) {
Runnable taskToUse = (this.taskDecorator != null ?
this.taskDecorator.decorate(task) : task);
// 1. 如果需要进行并发限流,走下面的逻辑
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
// 1.1 执行 this.concurrencyThrottle.beforeAccess()
// 如果被并发限流的话会阻塞等待
this.concurrencyThrottle.beforeAccess();
// 1.2 此时没有被限流住
// 将 task 包装为 ConcurrencyThrottlingRunnable
// ConcurrencyThrottlingRunnable 的 run() 的 finally 块会释放资源
// 使其他线程能通过限流
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
// 2. 不需要并发限流,直接执行 doExecute(task)
doExecute(taskToUse);
}
}
this.concurrencyThrottle.beforeAccess() and ConcurrencyThrottlingRunnable.run() are both key points, let's analyze them separately;
You can see that it controls concurrent current limiting through synchronized and concurrencyLimit;
// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void beforeAccess() {
if (this.concurrencyLimit == 0) {
// 不允许 concurrencyLimit == 0
throw new IllegalStateException();
}
// 1. 存在并发限流的场景,this.concurrencyLimit > 0
if (this.concurrencyLimit > 0) {
// 2. 尝试获取 monitor 对象锁,获取不到的话在这里阻塞,等其他线程释放锁
synchronized (this.monitor) {
// 3. 如果当前并发线程 >= this.concurrencyLimit
// 当前线程 wait 等待,直到其他线程唤醒它
while (this.concurrencyCount >= this.concurrencyLimit) {
this.monitor.wait();
}
// 4. 当前并发线程数 concurrencyCount++
this.concurrencyCount++;
}
}
}
Let's look at the run() of this class;
// --------------------- ConcurrencyThrottlingRunnable -----------------------
private class ConcurrencyThrottlingRunnable implements Runnable {
private final Runnable target;
public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
}
@Override
public void run() {
try {
// 1. 执行目标 target.run()
this.target.run();
}
finally {
// 2. 执行 concurrencyThrottle.afterAccess()
concurrencyThrottle.afterAccess();
}
}
}
// ---------------------- ConcurrencyThrottleSupport ------------------------
protected void afterAccess() {
// 并发限流场景下
// 先获取 monitor 对象锁,执行 concurrencyCount--,再唤醒 wait 中的线程
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
this.concurrencyCount--;
this.monitor.notify();
}
}
}
At this point, the analysis of SimpleAsyncTaskExecutor is complete;
In actual development, we should not use SimpleAsyncTaskExecutor to avoid catastrophic problems;