技術共有

Go コルーチンとチャネルの包括的なアプリケーションの問題

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. コルーチンとチャネルとは何かを簡単に理解する

コルーチンとは

コルーチンは、独立したスタック領域を持ち、プログラムのヒープ領域を共有するユーザーレベルの軽量スレッドです。

シングルスレッドをベースにしたアルゴリズムで実装されたマイクロスレッドであり、マルチスレッドプログラミングと比較して次のような利点があります。

  • コルーチンのコンテキスト切り替えはユーザーによって決定されるため、システム カーネルのコンテキスト切り替えは必要なく、オーバーヘッドが削減されます。
  • デフォルトでは、コルーチンは中断を防ぐために完全に保護されています。アトミック操作ロックは必要ありません
  • 単一スレッドでも高い同時実行性を実現でき、シングルコア CPU でも数万のコルーチンをサポートできます。

チャンネルとは何ですか

チャネルは、コルーチン間の通信に使用されるデータ構造です。キューと同様に、一方の端が送信者で、もう一方の端が受信者です。チャネルを使用すると、データの同期と順序を確保できます。

チャネルはバッファ付きチャネルとバッファなしチャネルに分けられ、次のように宣言されます。

  • バッファチャネルがあります
intChan := make(chan int,<缓冲容量>)
  • バッファリングされていないチャネル
intChan := make(chan int)

バッファー付きチャネルとバッファーなしチャネルの違い:

  • ブロッキング: バッファなしチャネルの送信側はデータが受信されるまでブロックし、バッファ付きチャネルの送信側はバッファがいっぱいになるまでブロックし、受信側はバッファが空でなくなるまでブロックします。
  • データの同期と順序: バッファなしチャネルはデータの同期と順序を保証しますが、バッファ付きパイプはデータの同期と順序を保証しません。
  • アプリケーション シナリオ: バッファなしチャネルには厳密な同期とシーケンスが必要ですが、バッファ付きチャネルは非同期通信を行うことができ、スループットを向上させることができます。

バッファなしチャネルの実装で注意する必要があるのは、チャネルの両端に送信者と受信者が存在する必要があることです。そうでないとデッドロックが発生します。

2.コルーチンチャネル同時プログラミングのケース

(1) 文字と数字を交互に印刷する

質問の意味: coroutine-channel を使用して、数字 1 ~ 10 と文字 A~J を交互に出力します。

コード:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

トピック分析:

この質問では、文字と数字を交互に出力する必要があるため、バッファなしチャネルのアプリケーション シナリオと一致する 2 つのコルーチンの厳密な順序を確保する必要があります。数字と文字をそれぞれ保存する 2 つのチャネルを設定します。数字と文字を出力する 2 つのコルーチンは、それぞれ 2 つのチャネルの送信側と受信側として機能します。ループ内で 1 回印刷し、別のコルーチンに印刷を促す信号を 1 回送信します。

最後の文字「10」が印刷されると、文字を印刷するためのコルーチンが終了し、numCh チャネルには受信側が存在しないことに注意してください。この時点では、バッファなしチャネルの実装条件は満たされていなければなりません。そうしないと、シグナルを再度送信するとブロッキング デッドロックが発生します。したがって、10 回目に再度信号を送信する必要はありません。

(2) タスクスケジューラの設計

タイトル: マルチコルーチン + チャネル プログラミング モデルを使用して、マルチタスクの同時処理のビジネス シナリオを実装するタスク スケジューラーを設計します。タスクの追加順序がスケジュール順序である必要があります。

コード:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

問題分析:

追加するタスクはマルチタスクのため複数存在し、これらのタスクを実行するには非同期処理が必要です。バッファされたチャネルに準拠するには、スループットと非同期処理の向上が必要です。

次に、タスクをチャネルに配置する必要があり、複数の受信者がチャネルからタスクを順番に取得して実行できます。

注意が必要な問題は、追加されたタスクの数がチャネルのバッファーを超えると、タスクを追加するときにブロッキングが発生することです。コンシューマーの通常の起動に影響を与えないようにするには、別のコルーチンを開いてタスクを追加する必要があります。

このようにして、コンシューマーが消費すると、ブロックしているプロデューサーが起動されてタスクの追加を続行します。

3. まとめ

コルーチン + チャネル プログラミング モデルを学習した後は、タイトルで述べたことに加えて、次の問題にも注意する必要があります。

1. チャネルを使い切った後、チャネルを閉じる必要があるのはなぜですか? チャネルを閉じないとどのようなリスクがありますか?

  • デッドロックを避けるため。チャネルを閉じると、送信者から送信するデータがもうないため、データを待ち続ける必要がないことが受信者に通知されます。チャネル閉鎖情報を受信した後、受信機はデータの受信を停止します。チャネルが閉鎖されていない場合、受信機はブロックされたままになり、デッドロックの危険性があります。
  • リソースを解放し、リソースのリークを回避します。チャネルを閉じた後、システムは対応するリソースを解放します。チャネルを適切なタイミングで閉じると、リソースの無駄や漏洩を回避できます。

2. チャネルを正常に閉じるにはどうすればよいですか?

まず、チャネルを閉じる最も基本的な原則は、閉じたチャネルを閉じないことです。次に、Go チャネルの使用には別の原則があります。データ受信側のチャネル、または送信側が複数ある場合はチャネルを閉じないでください。言い換えるとチャネルの唯一の送信者のみがこのチャネルを閉じられるようにする必要があります。

失礼な方法は、例外回復によってチャネルを閉じることですが、これは明らかに上記の原則に違反し、データ競合を引き起こす可能性があります。別の方法は、sync.Once または sync.Mutex を使用してチャネルを閉じることですが、これは同時終了が発生することが保証されていません。チャネル上の操作と送信操作によってデータ競合が発生することはありません。どちらの方法にも問題があるため、詳細は紹介しません。 ここでは、チャネルを正常に閉じる方法について説明します。

シナリオ 1: M 個の受信者と 1 人の送信者

最も対処しやすい状況の 1 つです。送信者が送信を終了する必要がある場合は、チャネルを閉じてください。上記の 2 つのプログラミング例がこれに当てはまります。

シナリオ 2: 1 人の受信者と N 人の送信者

Go チャネルの基本原則によれば、チャネルの唯一の送信者でのみチャネルを閉じることができます。したがって、この場合、どこかでチャネルを直接閉じることはできません。しかし、受信者が追加の信号チャネルを閉じて、送信者にこれ以上データを送信しないよう指示することができます。

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

このメソッドでは、追加の信号チャネル stopCh を追加します。これは、受信側がデータを受信する必要がなくなったことを送信側に伝えるために使用されます。さらに、このメソッドは dataCh を閉じません。チャネルがコルーチンによって使用されなくなった場合、チャネルが閉じられているかどうかに関係なく、徐々にガベージ コレクションが行われます。

この方法の優れた点は、1 つのチャネルを閉じることで別のチャネルの使用を停止し、それによって間接的に他のチャネルも閉じることになることです。

シナリオ 3: M 個の受信者と N 個の送信者

受信者または送信者のいずれかがデータの送信に使用されるチャネルを閉じることはできません。また、複数の受信者の 1 つが追加のシグナリング チャネルを閉じることもできません。これらの実践はどちらもチャネル閉鎖原則に違反します。

ただし、ご紹介できるのは、中間調停者の役割と、すべての受信者と送信者に作業の終了を通知する追加の信号チャネルを閉じさせる

コード例:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }