Обмен технологиями

[Операционная система] Очередь блокировки и модель производитель-потребитель

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

блокирующая очередь

1. Концепция

Блокирующая очередь — это особый вид очереди. Она также соответствует принципу «первым пришел — первым обслужен».

Блокирующая очередь может представлять собой поточно-ориентированную структуру данных и имеет следующие характеристики:

  • Когда очередь заполнена, продолжение входа в очередь будет заблокировано до тех пор, пока другие потоки не возьмут элементы из очереди..
  • Когда очередь пуста, продолжение удаления из очереди также будет заблокировано до тех пор, пока другие потоки не вставят элементы в очередь.

Типичным сценарием применения блокировки очереди является «модель производитель-потребитель». Это очень типичная модель разработки.
режиссер

Потребительская модель решает проблему сильной связи между производителями и потребителями через контейнер.

Производители и потребители не общаются друг с другом напрямую, а общаются через блокирующие очереди. Поэтому после того, как производитель производит данные, ему не нужно ждать, пока потребитель их обработает, а напрямую бросает их в блокирующую очередь. не запрашивает данные у производителя. Вместо этого они берутся непосредственно из очереди блокировки.

Внедрение модели производитель-потребитель состоит из двух основных этапов:

  1. производитель реализации
  2. реализовывать потребителя

2. Блокировка очереди в стандартной библиотеке

В стандартную библиотеку Java встроена очередь блокировки. Если нам нужно использовать очереди блокировки в некоторых программах, мы можем напрямую использовать очередь блокировки в стандартной библиотеке.
Может.

  • BlockingQueue — это интерфейс. Настоящий класс реализации — LinkedBlockingQueue.
  • Метод put используется для блокировки входа в очередь, а метод take — для блокировки выхода из очереди.
  • BlockingQueue также имеет такие методы, как предложение, опрос и просмотр, но эти методы не обладают характеристиками блокировки.

Псевдокод блокировки очереди выглядит следующим образом:

BlockingQueue<String> queue = new LinkedBlockingQueue<>();

// ⼊队列
queue.put("abc");

// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. Модель производитель-потребитель

public static void main(String[] args) throws InterruptedException {

	BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
	
	Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println("消费元素: " + value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");
	
customer.start();

	Thread producer = new Thread(() -> {
		Random random = new Random();
		while (true) {
			try {
				int num = random.nextInt(1000);
				System.out.println("生产元素: " + num);
				blockingQueue.put(num);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "生产者");
	
	producer.start();
	customer.join();
	producer.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4. Блокировка реализации очереди

  1. Это достигается за счет метода «круговой очереди».
  2. Используйте синхронизацию для управления блокировкой.
  3. Когда put вставляет элемент, он определяет, заполнена ли очередь, и ожидает (Обратите внимание, что ожидание должно выполняться в цикле. Когда он пробуждается, очередь может быть не заполнена, поскольку одновременно могут быть пробуждены несколько потоков.) .
  4. Когда take извлекает элемент, он определяет, пуста ли очередь, и ожидает (это также ожидание цикла).
public class BlockingQueue {

	private int[] items = new int[1000];
	private volatile int size = 0;
	private volatile int head = 0;
	private volatile int tail = 0;
	
	public void put(int value) throws InterruptedException {
		synchronized (this) {
		// 此处最好使⽤ while.
		// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
		// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
		// 就只能继续等待
			while (size == items.length) {
				wait();
			}
			items[tail] = value;
			tail = (tail + 1) % items.length;
			size++;
			notifyAll();
		}
	}
	
	public int take() throws InterruptedException {
		int ret = 0;
		synchronized (this) {
			while (size == 0) {
				wait();
			}
			ret = items[head];
			head = (head + 1) % items.length;
			size--;
			notifyAll();
		}
		return ret;
	}
	
	public synchronized int size() {
		return size;
	}
	
	// 测试代码
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue blockingQueue = new BlockingQueue();
		Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println(value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");	
	
		customer.start();
		Thread producer = new Thread(() -> {
			Random random = new Random();
				while (true) {					try {
						blockingQueue.put(random.nextInt(10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}, "生产者");
			
		producer.start();
		customer.join();
		producer.join();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

Подведем итог

  1. Очередь блокировки эквивалентна буферу, балансирующему возможности обработки производителей и потребителей (отсечение и заполнение пиков).
  2. Блокирующие очереди также могут разделить производителей и потребителей.