내 연락처 정보
우편메소피아@프로톤메일.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
차단 대기열은 특별한 종류의 대기열이기도 하며 "선입 선출" 원칙을 따릅니다.
차단 큐는 스레드로부터 안전한 데이터 구조일 수 있으며 다음과 같은 특징을 갖습니다.
차단 대기열의 일반적인 응용 시나리오는 "생산자-소비자 모델"입니다. 이는 매우 일반적인 개발 모델입니다.
생산자
소비자 모델은 컨테이너를 통해 생산자와 소비자 간의 강력한 결합 문제를 해결합니다.
생산자와 소비자는 서로 직접 통신하지 않고 차단 대기열을 통해 통신하므로 생산자가 데이터를 생성한 후 소비자가 이를 처리할 때까지 기다릴 필요가 없고 직접 차단 대기열에 전달됩니다. 생산자에게 데이터를 요청하지 않고 차단 대기열에서 직접 가져옵니다.
생산자-소비자 모델 구현에는 두 가지 주요 단계가 있습니다.
Java 표준 라이브러리에는 차단 큐가 내장되어 있습니다. 일부 프로그램에서 차단 큐를 사용해야 하는 경우 표준 라이브러리의 차단 큐를 직접 사용할 수 있습니다.
할 수 있다.
차단 대기열의 의사 코드는 다음과 같습니다.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// ⼊队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素: " + num);
blockingQueue.put(num);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
public class BlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0;
private volatile int head = 0;
private volatile int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处最好使⽤ while.
// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
// 就只能继续等待
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
// 测试代码
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new BlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) { try {
blockingQueue.put(random.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
}