私の連絡先情報
郵便メール:
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
ブロッキング キューは特殊な種類のキューであり、「先入れ先出し」の原則に従います。
ブロッキング キューはスレッド セーフなデータ構造にすることができ、次の特徴があります。
ブロッキング キューの典型的なアプリケーション シナリオは、「プロデューサー/コンシューマー モデル」です。これは非常に典型的な開発モデルです。
プロデューサー
コンシューマ モデルは、コンテナを介してプロデューサとコンシューマの間の強力な結合の問題を解決します。
プロデューサーとコンシューマーは相互に直接通信するのではなく、ブロッキング キューを介して通信します。そのため、プロデューサーがデータを生成した後、コンシューマーがデータを処理するのを待つ必要はなく、それをコンシューマーのブロッキング キューに直接スローします。プロデューサーにデータを要求せず、代わりにブロッキング キューから直接取得します。
生産者/消費者モデルの実装には、次の 2 つの主要な手順があります。
Java 標準ライブラリにはブロッキング キューが組み込まれており、プログラムでブロッキング キューを使用する必要がある場合は、標準ライブラリのブロッキング キューを直接使用できます。
できる。
キューをブロックするための疑似コードは次のとおりです。
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// ⼊队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素: " + num);
blockingQueue.put(num);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
public class BlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0;
private volatile int head = 0;
private volatile int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处最好使⽤ while.
// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
// 就只能继续等待
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
// 测试代码
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new BlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) { try {
blockingQueue.put(random.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
}