기술나눔

[운영체제] 차단 대기열 및 생산자-소비자 모델

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

차단 대기열

1. 컨셉

차단 대기열은 특별한 종류의 대기열이기도 하며 "선입 선출" 원칙을 따릅니다.

차단 큐는 스레드로부터 안전한 데이터 구조일 수 있으며 다음과 같은 특징을 갖습니다.

  • 대기열이 가득 차면 대기열에 계속 들어가면 다른 스레드가 대기열에서 요소를 가져올 때까지 차단됩니다..
  • 대기열이 비어 있을 때 대기열에서 계속 빼면 다른 스레드가 대기열에 요소를 삽입할 때까지 차단됩니다.

차단 대기열의 일반적인 응용 시나리오는 "생산자-소비자 모델"입니다. 이는 매우 일반적인 개발 모델입니다.
생산자

소비자 모델은 컨테이너를 통해 생산자와 소비자 간의 강력한 결합 문제를 해결합니다.

생산자와 소비자는 서로 직접 통신하지 않고 차단 대기열을 통해 통신하므로 생산자가 데이터를 생성한 후 소비자가 이를 처리할 때까지 기다릴 필요가 없고 직접 차단 대기열에 전달됩니다. 생산자에게 데이터를 요청하지 않고 차단 대기열에서 직접 가져옵니다.

생산자-소비자 모델 구현에는 두 가지 주요 단계가 있습니다.

  1. 구현 제작자
  2. 소비자를 실현하다

2. 표준 라이브러리의 블로킹 큐

Java 표준 라이브러리에는 차단 큐가 내장되어 있습니다. 일부 프로그램에서 차단 큐를 사용해야 하는 경우 표준 라이브러리의 차단 큐를 직접 사용할 수 있습니다.
할 수 있다.

  • BlockingQueue는 인터페이스입니다. 실제 구현 클래스는 LinkedBlockingQueue입니다.
  • Put 메소드는 큐 진입을 차단하는 데 사용되고 Take 메소드는 큐 종료를 차단하는 데 사용됩니다.
  • BlockingQueue에는 Offer, Poll, Peek 등의 메서드도 있지만 이러한 메서드에는 차단 특성이 없습니다.

차단 대기열의 의사 코드는 다음과 같습니다.

BlockingQueue<String> queue = new LinkedBlockingQueue<>();

// ⼊队列
queue.put("abc");

// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. 생산자-소비자 모델

public static void main(String[] args) throws InterruptedException {

	BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
	
	Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println("消费元素: " + value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");
	
customer.start();

	Thread producer = new Thread(() -> {
		Random random = new Random();
		while (true) {
			try {
				int num = random.nextInt(1000);
				System.out.println("生产元素: " + num);
				blockingQueue.put(num);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "生产者");
	
	producer.start();
	customer.join();
	producer.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4. 차단 대기열 구현

  1. 이는 "순환 대기열" 방법을 통해 달성됩니다.
  2. 잠금 제어에는 동기화를 사용하십시오.
  3. Put이 요소를 삽입할 때 큐가 꽉 찼는지 확인하고 대기합니다. (대기는 루프에서 수행되어야 합니다. 깨울 때 여러 스레드가 동시에 깨울 수 있으므로 큐가 가득 차지 않을 수 있습니다.) .
  4. take가 요소를 꺼낼 때 대기열이 비어 있는지 확인하고 대기합니다. (이 역시 루프 대기입니다.)
public class BlockingQueue {

	private int[] items = new int[1000];
	private volatile int size = 0;
	private volatile int head = 0;
	private volatile int tail = 0;
	
	public void put(int value) throws InterruptedException {
		synchronized (this) {
		// 此处最好使⽤ while.
		// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
		// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
		// 就只能继续等待
			while (size == items.length) {
				wait();
			}
			items[tail] = value;
			tail = (tail + 1) % items.length;
			size++;
			notifyAll();
		}
	}
	
	public int take() throws InterruptedException {
		int ret = 0;
		synchronized (this) {
			while (size == 0) {
				wait();
			}
			ret = items[head];
			head = (head + 1) % items.length;
			size--;
			notifyAll();
		}
		return ret;
	}
	
	public synchronized int size() {
		return size;
	}
	
	// 测试代码
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue blockingQueue = new BlockingQueue();
		Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println(value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");	
	
		customer.start();
		Thread producer = new Thread(() -> {
			Random random = new Random();
				while (true) {					try {
						blockingQueue.put(random.nextInt(10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}, "生产者");
			
		producer.start();
		customer.join();
		producer.join();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

요약하다

  1. 차단 대기열은 생산자와 소비자의 처리 기능의 균형을 맞추는 버퍼와 동일합니다(피크 클리핑 및 채우기).
  2. 차단 대기열은 생산자와 소비자를 분리할 수도 있습니다.