기술나눔

Go 코루틴 및 채널의 종합적인 적용 문제

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. 코루틴과 채널이 무엇인지 간략하게 이해합니다.

코루틴이란 무엇입니까?

코루틴은 독립적인 스택 공간을 갖고 프로그램의 힙 공간을 공유하는 사용자 수준의 경량 스레드입니다.

단일 스레드 기반의 알고리즘을 통해 구현된 마이크로 스레드입니다. 멀티 스레드 프로그래밍과 비교하면 다음과 같은 장점이 있습니다.

  • 코루틴의 컨텍스트 전환은 시스템 커널의 컨텍스트 전환 없이 사용자가 결정하므로 오버헤드가 줄어듭니다.
  • 기본적으로 코루틴은 중단을 방지하기 위해 완전히 보호됩니다.원자 연산 잠금이 필요하지 않습니다.
  • 단일 스레드는 높은 동시성을 달성할 수도 있으며 단일 코어 CPU라도 수만 개의 코루틴을 지원할 수 있습니다.

채널이 뭐야?

채널은 코루틴 간의 통신에 사용되는 데이터 구조입니다. 대기열과 유사하게 한쪽 끝은 발신자이고 다른 쪽 끝은 수신자입니다. 채널을 사용하면 데이터의 동기화 및 순서를 보장할 수 있습니다.

채널은 버퍼링된 채널과 버퍼링되지 않은 채널로 구분되며 다음과 같이 선언됩니다.

  • 버퍼 채널이 있습니다
intChan := make(chan int,<缓冲容量>)
  • 버퍼링되지 않은 채널
intChan := make(chan int)

버퍼링된 채널과 버퍼링되지 않은 채널의 차이점:

  • 차단: 버퍼링되지 않은 채널의 송신자는 데이터가 수신될 때까지 차단합니다. 버퍼링된 채널의 송신자는 버퍼가 가득 찰 때까지 차단하고 수신자는 버퍼가 비어 있지 않을 때까지 차단합니다.
  • 데이터 동기화 및 순서: 버퍼링되지 않은 채널은 데이터 동기화 및 순서를 보장하지만 버퍼링된 파이프는 데이터 동기화 및 순서를 보장하지 않습니다.
  • 애플리케이션 시나리오: 버퍼링되지 않은 채널에는 엄격한 동기화 및 순서가 필요합니다. 버퍼링된 채널은 비동기식으로 통신하고 처리량을 향상시킬 수 있습니다.

버퍼링되지 않은 채널을 구현할 때 주의해야 할 점은 채널 양쪽 끝에 송신자와 수신자가 있어야 하며, 그렇지 않으면 교착 상태가 발생한다는 것입니다.

2. 코루틴 채널 동시 프로그래밍 사례

(1) 문자와 숫자를 교대로 인쇄

질문 의미: 코루틴 채널을 사용하여 숫자 1-10과 문자 AJ를 번갈아 인쇄합니다.

암호:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

주제 분석:

이 질문에서는 문자와 숫자를 교대로 인쇄해야 하므로 버퍼링되지 않은 채널의 애플리케이션 시나리오와 일치하는 두 코루틴의 엄격한 순서를 보장해야 합니다. 숫자와 문자를 각각 저장하는 두 개의 채널을 설정합니다. 숫자와 문자를 인쇄하는 두 개의 코루틴은 각각 두 채널의 발신자와 수신자 역할을 합니다. 루프에서 한 번 인쇄하고 신호를 한 번 보내 다른 코루틴에 인쇄하도록 상기시킵니다.

마지막 문자 '10'이 인쇄되면 문자 인쇄를 위한 코루틴이 종료되고, 이때 numCh 채널에는 수신자가 없습니다. 이때 unbuffered 채널의 구현 조건은 더 이상 충족되지 않습니다. 그렇지 않으면 신호를 다시 보내면 차단 교착 상태가 발생합니다. 따라서 10번째로 다시 신호를 보낼 필요가 없습니다.

(2) 작업 스케줄러 설계

제목: 다중 코루틴 + 채널 프로그래밍 모델을 사용하여 다중 작업 동시 처리의 비즈니스 시나리오를 구현하고 작업이 추가되는 순서대로 예약 순서를 지정하는 작업 스케줄러를 설계합니다.

암호:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

문제 분석:

추가된 작업은 다중 작업이므로 둘 이상이고 이러한 작업을 실행하려면 비동기 처리가 필요합니다. 버퍼링된 채널을 준수하려면 향상된 처리량과 비동기 처리가 필요합니다.

그런 다음 작업을 채널에 넣어야 하며, 여러 수신자가 채널에서 작업을 순서대로 가져와 실행할 수 있습니다.

주의가 필요한 문제는 추가된 작업 개수가 채널의 버퍼보다 ​​많으면 작업 추가 시 차단이 발생한다는 점이다. 컨슈머의 정상적인 시작에 영향을 주지 않기 위해서는 별도의 코루틴을 열어 작업을 추가해야 합니다.

이런 식으로 소비자가 소비하면 차단 생산자가 깨어나 작업을 계속 추가합니다.

3. 요약

코루틴 + 채널 프로그래밍 모델을 연구한 후에는 제목에서 방금 언급한 것 외에도 다음 문제에도 주의를 기울여야 합니다.

1. 채널을 모두 사용한 후에 채널을 폐쇄해야 하는 이유는 무엇입니까?

  • 교착상태를 피하기 위해. 또한 채널을 닫으면 송신자로부터 더 이상 전송할 데이터가 없으며 데이터를 계속 기다릴 필요가 없음을 수신자에게 알립니다. 채널 폐쇄 정보를 수신한 후, 채널이 폐쇄되지 않으면 수신기는 데이터 수신을 중단하고, 수신기는 차단된 상태로 유지되며 교착 상태가 발생할 위험이 있습니다.
  • 리소스를 해제하고 리소스 누출을 방지하세요. 채널을 닫은 후 시스템은 해당 리소스를 해제합니다. 시간에 맞춰 채널을 닫으면 리소스 낭비와 누출을 피할 수 있습니다.

2. 채널을 정상적으로 닫는 방법은 무엇입니까?

우선, 채널 폐쇄의 가장 기본적인 원칙은 이미 폐쇄된 채널을 폐쇄하지 않는 것입니다. 둘째, Go 채널을 사용하는 또 다른 원칙이 있습니다.데이터 수신자 또는 송신자가 여러 명인 경우 채널을 닫지 마십시오.다시 말해서채널의 유일한 발신자만 이 채널을 닫도록 해야 합니다.

무례한 방법은 예외 복구를 통해 채널을 닫는 것이지만 이는 분명히 위의 원칙을 위반하고 데이터 경쟁을 일으킬 수 있습니다. 또 다른 방법은 동시 닫기가 보장되지 않는 sync.Once 또는 sync.Mutex를 사용하여 채널을 닫는 것입니다. 채널에서의 작업 및 전송 작업은 데이터 경합을 생성하지 않습니다. 두 가지 방법 모두 문제가 있기 때문에 여기서는 채널을 정상적으로 닫는 방법에 대해 자세히 소개하지 않겠습니다.

시나리오 1: M명의 수신자와 1명의 발신자

대처하기 가장 쉬운 상황 중 하나입니다. 발신자가 전송을 완료해야 할 경우 채널을 닫도록 하세요. 이는 위의 두 프로그래밍 예제의 경우입니다.

시나리오 2: 수신자 1명과 발신자 N명

Go 채널의 기본 원칙에 따르면 채널의 유일한 발신자에서만 채널을 닫을 수 있습니다. 따라서 이 경우 어딘가에서 직접 채널을 닫을 수는 없습니다.그러나 송신자에게 더 이상 데이터를 보내지 말라고 알리기 위해 수신자가 추가 신호 채널을 닫도록 할 수 있습니다.

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

이 방법에서는 수신자가 더 이상 데이터를 수신할 필요가 없음을 발신자에게 알리기 위해 사용하는 추가 신호 채널 stopCh를 추가합니다. 또한 이 메서드는 dataCh를 닫지 않습니다. 코루틴에서 더 이상 채널을 사용하지 않으면 채널이 닫혔는지 여부에 관계없이 점차적으로 가비지 수집됩니다.

이 방법의 장점은 한 채널을 닫으면 다른 채널 사용을 중지하여 간접적으로 다른 채널을 닫는다는 것입니다.

시나리오 3: M명의 수신자와 N명의 발신자

수신자나 발신자가 데이터 전송에 사용되는 채널을 닫도록 할 수 없으며, 여러 수신자 중 하나가 추가 신호 채널을 닫도록 할 수도 없습니다. 이 두 가지 관행 모두 채널 폐쇄 원칙을 위반합니다.

그러나 우리가 소개할 수 있는 것은중간 중재자 역할 및 모든 수신자와 발신자에게 작업 종료를 알리기 위해 추가 신호 채널을 닫습니다.

코드 예:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }