技術共有

[オペレーティング システム] ブロッキング キューとプロデューサー/コンシューマー モデル

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

ブロックキュー

1.コンセプト

ブロッキング キューは特殊な種類のキューであり、「先入れ先出し」の原則に従います。

ブロッキング キューはスレッド セーフなデータ構造にすることができ、次の特徴があります。

  • キューがいっぱいの場合、キューに入り続けると、他のスレッドがキューから要素を取得するまでブロックされます。.
  • キューが空の場合、デキューを続行すると、他のスレッドが要素をキューに挿入するまでブロックされます。

ブロッキング キューの典型的なアプリケーション シナリオは、「プロデューサー/コンシューマー モデル」です。これは非常に典型的な開発モデルです。
プロデューサー

コンシューマ モデルは、コンテナを介してプロデューサとコンシューマの間の強力な結合の問題を解決します。

プロデューサーとコンシューマーは相互に直接通信するのではなく、ブロッキング キューを介して通信します。そのため、プロデューサーがデータを生成した後、コンシューマーがデータを処理するのを待つ必要はなく、それをコンシューマーのブロッキング キューに直接スローします。プロデューサーにデータを要求せず、代わりにブロッキング キューから直接取得します。

生産者/消費者モデルの実装には、次の 2 つの主要な手順があります。

  1. プロデューサーを実装する
  2. 消費者を実現する

2. 標準ライブラリのブロッキングキュー

Java 標準ライブラリにはブロッキング キューが組み込まれており、プログラムでブロッキング キューを使用する必要がある場合は、標準ライブラリのブロッキング キューを直接使用できます。
できる。

  • BlockingQueue はインターフェイスです。実際の実装クラスは LinkedBlockingQueue です。
  • put メソッドはキューのエントリをブロックするために使用され、take メソッドはキューの終了をブロックするために使用されます。
  • BlockingQueue には、offer、pol、peek などのメソッドもありますが、これらのメソッドにはブロック特性がありません。

キューをブロックするための疑似コードは次のとおりです。

BlockingQueue<String> queue = new LinkedBlockingQueue<>();

// ⼊队列
queue.put("abc");

// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. 生産者・消費者モデル

public static void main(String[] args) throws InterruptedException {

	BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
	
	Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println("消费元素: " + value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");
	
customer.start();

	Thread producer = new Thread(() -> {
		Random random = new Random();
		while (true) {
			try {
				int num = random.nextInt(1000);
				System.out.println("生产元素: " + num);
				blockingQueue.put(num);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "生产者");
	
	producer.start();
	customer.join();
	producer.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4. ブロッキングキューの実装

  1. これは、「循環キュー」方式によって実現されます。
  2. ロック制御には同期を使用します。
  3. put は要素を挿入するときに、キューが満杯であるかどうかを判断して待機します (待機はループ内で実行する必要があることに注意してください。ウェイクアップされるとき、複数のスレッドが同時にウェイクアップされる可能性があるため、キューが満杯ではない可能性があります)。 。
  4. takeは要素を取り出すときにキューが空かどうかを判断して待ちます(ループ待ちでもあります)。
public class BlockingQueue {

	private int[] items = new int[1000];
	private volatile int size = 0;
	private volatile int head = 0;
	private volatile int tail = 0;
	
	public void put(int value) throws InterruptedException {
		synchronized (this) {
		// 此处最好使⽤ while.
		// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
		// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
		// 就只能继续等待
			while (size == items.length) {
				wait();
			}
			items[tail] = value;
			tail = (tail + 1) % items.length;
			size++;
			notifyAll();
		}
	}
	
	public int take() throws InterruptedException {
		int ret = 0;
		synchronized (this) {
			while (size == 0) {
				wait();
			}
			ret = items[head];
			head = (head + 1) % items.length;
			size--;
			notifyAll();
		}
		return ret;
	}
	
	public synchronized int size() {
		return size;
	}
	
	// 测试代码
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue blockingQueue = new BlockingQueue();
		Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println(value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");	
	
		customer.start();
		Thread producer = new Thread(() -> {
			Random random = new Random();
				while (true) {					try {
						blockingQueue.put(random.nextInt(10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}, "生产者");
			
		producer.start();
		customer.join();
		producer.join();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

要約する

  1. ブロッキング キューはバッファーに相当し、プロデューサーとコンシューマーの処理能力のバランスをとります (ピーク クリッピングとフィル)。
  2. キューをブロックすると、プロデューサーとコンシューマーが切り離される可能性もあります。