Technology Sharing

[Operating System] Blocking Queue and Producer Consumer Model

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Blocking Queue

I. Concept

The blocking queue is a special queue. It also follows the "first in, first out" principle.

A blocking queue is a thread-safe data structure with the following properties:

  • When the queue is full, further entry into the queue will be blocked until another thread takes an element from the queue..
  • When the queue is empty, continuing to dequeue will also block until another thread inserts an element into the queue.

A typical application scenario of blocking queues is the "producer consumer model". This is a very typical development model.
Producer

The consumer mode solves the problem of strong coupling between producers and consumers through a container.

Producers and consumers do not communicate directly with each other, but communicate through blocking queues. Therefore, after the producer produces data, it does not need to wait for the consumer to process it, but directly throws it to the blocking queue. The consumer does not ask the producer for data, but directly takes it from the blocking queue.

There are two major steps to implement the producer consumer model:

  1. Implementing Producers
  2. Realize Consumer

2. Blocking queues in the standard library

There is a blocking queue built into the Java standard library. If we need to use a blocking queue in some programs, we can directly use the blocking queue in the standard library.
Can.

  • BlockingQueue is an interface. The actual implementation class is LinkedBlockingQueue
  • The put method is used for blocking entry into the queue, and take is used for blocking exit from the queue.
  • BlockingQueue also has methods such as offer, poll, peek, etc., but these methods do not have blocking characteristics.

The pseudo code of the blocking queue is as follows:

BlockingQueue<String> queue = new LinkedBlockingQueue<>();

// ⼊队列
queue.put("abc");

// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. Producer Consumer Model

public static void main(String[] args) throws InterruptedException {

	BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
	
	Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println("消费元素: " + value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");
	
customer.start();

	Thread producer = new Thread(() -> {
		Random random = new Random();
		while (true) {
			try {
				int num = random.nextInt(1000);
				System.out.println("生产元素: " + num);
				blockingQueue.put(num);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "生产者");
	
	producer.start();
	customer.join();
	producer.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4. Blocking Queue Implementation

  1. This is achieved through a "circular queue" approach.
  2. Use synchronized for locking control.
  3. When inserting an element with put, if the queue is full, wait is performed. (Note that wait must be performed in a loop. The queue may not be full when it is awakened, because multiple threads may be awakened at the same time).
  4. When take takes out an element, if the queue is empty, wait is performed. (It is also a circular wait)
public class BlockingQueue {

	private int[] items = new int[1000];
	private volatile int size = 0;
	private volatile int head = 0;
	private volatile int tail = 0;
	
	public void put(int value) throws InterruptedException {
		synchronized (this) {
		// 此处最好使⽤ while.
		// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
		// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
		// 就只能继续等待
			while (size == items.length) {
				wait();
			}
			items[tail] = value;
			tail = (tail + 1) % items.length;
			size++;
			notifyAll();
		}
	}
	
	public int take() throws InterruptedException {
		int ret = 0;
		synchronized (this) {
			while (size == 0) {
				wait();
			}
			ret = items[head];
			head = (head + 1) % items.length;
			size--;
			notifyAll();
		}
		return ret;
	}
	
	public synchronized int size() {
		return size;
	}
	
	// 测试代码
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue blockingQueue = new BlockingQueue();
		Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println(value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");	
	
		customer.start();
		Thread producer = new Thread(() -> {
			Random random = new Random();
				while (true) {					try {
						blockingQueue.put(random.nextInt(10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}, "生产者");
			
		producer.start();
		customer.join();
		producer.join();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

Summarize

  1. The blocking queue is equivalent to a buffer, balancing the processing capabilities of producers and consumers. (peak shaving and valley filling)
  2. Blocking queues can also decouple producers and consumers.