プライベートな連絡先の最初の情報
送料メール:
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
コルーチンは、独立したスタック領域を持ち、プログラムのヒープ領域を共有するユーザーレベルの軽量スレッドです。
シングルスレッドをベースにしたアルゴリズムで実装されたマイクロスレッドであり、マルチスレッドプログラミングと比較して次のような利点があります。
チャネルは、コルーチン間の通信に使用されるデータ構造です。キューと同様に、一方の端が送信者で、もう一方の端が受信者です。チャネルを使用すると、データの同期と順序を確保できます。
チャネルはバッファ付きチャネルとバッファなしチャネルに分けられ、次のように宣言されます。
intChan := make(chan int,<缓冲容量>)
intChan := make(chan int)
バッファー付きチャネルとバッファーなしチャネルの違い:
バッファなしチャネルの実装で注意する必要があるのは、チャネルの両端に送信者と受信者が存在する必要があることです。そうでないとデッドロックが発生します。
質問の意味: coroutine-channel を使用して、数字 1 ~ 10 と文字 A~J を交互に出力します。
コード:
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- /*
- 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
- 解决办法:
- 避免死锁的发生:
- 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
- */
-
- func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
- defer wg.Done()
-
- for i := 1; i <= 10; i++ {
- <-alpCh // 等待字母goroutine发信号
- fmt.Print(i, " ")
- //避免死锁发生
- if i < 10 {
- numCh <- struct{}{} // 发信号给字母goroutine
- }
- if i == 10 {
- close(numCh)
- }
- }
-
- }
-
- func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
- defer wg.Done()
-
- for i := 'A'; i <= 'J'; i++ {
- <-numCh // 等待数字goroutine发信号
- fmt.Printf("%c", i)
- alpCh <- struct{}{} // 发信号给数字goroutine
- }
- close(alpCh)
- }
-
- func main() {
- numCh := make(chan struct{}) // 用于数字goroutine的信号通道
- alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
- var wg sync.WaitGroup
-
- wg.Add(2)
-
- go printAlp(&wg, numCh, alpCh)
- go printNum(&wg, numCh, alpCh)
-
- // 启动时先给数字goroutine发送一个信号
- numCh <- struct{}{}
-
- wg.Wait()
-
- }
トピック分析:
この質問では、文字と数字を交互に出力する必要があるため、バッファなしチャネルのアプリケーション シナリオと一致する 2 つのコルーチンの厳密な順序を確保する必要があります。数字と文字をそれぞれ保存する 2 つのチャネルを設定します。数字と文字を出力する 2 つのコルーチンは、それぞれ 2 つのチャネルの送信側と受信側として機能します。ループ内で 1 回印刷し、別のコルーチンに印刷を促す信号を 1 回送信します。
最後の文字「10」が印刷されると、文字を印刷するためのコルーチンが終了し、numCh チャネルには受信側が存在しないことに注意してください。この時点では、バッファなしチャネルの実装条件は満たされていなければなりません。そうしないと、シグナルを再度送信するとブロッキング デッドロックが発生します。したがって、10 回目に再度信号を送信する必要はありません。
タイトル: マルチコルーチン + チャネル プログラミング モデルを使用して、マルチタスクの同時処理のビジネス シナリオを実装するタスク スケジューラーを設計します。タスクの追加順序がスケジュール順序である必要があります。
コード:
- type scheduler struct {
- taskChan chan func()
- wt sync.WaitGroup
- }
-
- func (td *scheduler) AddTask(task func()) {
- td.taskChan <- task
- }
-
- func (td *scheduler) Executer() {
- defer td.wt.Done()
- for {
- task, ok := <-td.taskChan
- task()
- if ok && len(td.taskChan) == 0 {
- break
- }
- }
- }
-
- func (td *scheduler) Start() {
- td.wt.Add(4)
- //假设四个消费者
- for i := 0; i < 4; i++ {
- go td.Executer()
- }
-
- td.wt.Wait()
- }
-
- func main() {
- sd := scheduler{
- taskChan: make(chan func(), 5),
- }
-
- go func() {
- sd.AddTask(func() {
- fmt.Println("任务1")
- })
- sd.AddTask(func() {
- fmt.Println("任务2")
- })
- sd.AddTask(func() {
- fmt.Println("任务3")
- })
- sd.AddTask(func() {
- fmt.Println("任务4")
- })
- sd.AddTask(func() {
- fmt.Println("任务5")
- })
- sd.AddTask(func() {
- fmt.Println("任务6")
- })
- close(sd.taskChan)
- }()
-
- sd.Start()
-
- }
問題分析:
追加するタスクはマルチタスクのため複数存在し、これらのタスクを実行するには非同期処理が必要です。バッファされたチャネルに準拠するには、スループットと非同期処理の向上が必要です。
次に、タスクをチャネルに配置する必要があり、複数の受信者がチャネルからタスクを順番に取得して実行できます。
注意が必要な問題は、追加されたタスクの数がチャネルのバッファーを超えると、タスクを追加するときにブロッキングが発生することです。コンシューマーの通常の起動に影響を与えないようにするには、別のコルーチンを開いてタスクを追加する必要があります。
このようにして、コンシューマーが消費すると、ブロックしているプロデューサーが起動されてタスクの追加を続行します。
コルーチン + チャネル プログラミング モデルを学習した後は、タイトルで述べたことに加えて、次の問題にも注意する必要があります。
まず、チャネルを閉じる最も基本的な原則は、閉じたチャネルを閉じないことです。次に、Go チャネルの使用には別の原則があります。データ受信側のチャネル、または送信側が複数ある場合はチャネルを閉じないでください。言い換えると,チャネルの唯一の送信者のみがこのチャネルを閉じられるようにする必要があります。
失礼な方法は、例外回復によってチャネルを閉じることですが、これは明らかに上記の原則に違反し、データ競合を引き起こす可能性があります。別の方法は、sync.Once または sync.Mutex を使用してチャネルを閉じることですが、これは同時終了が発生することが保証されていません。チャネル上の操作と送信操作によってデータ競合が発生することはありません。どちらの方法にも問題があるため、詳細は紹介しません。 ここでは、チャネルを正常に閉じる方法について説明します。
最も対処しやすい状況の 1 つです。送信者が送信を終了する必要がある場合は、チャネルを閉じてください。上記の 2 つのプログラミング例がこれに当てはまります。
Go チャネルの基本原則によれば、チャネルの唯一の送信者でのみチャネルを閉じることができます。したがって、この場合、どこかでチャネルを直接閉じることはできません。しかし、受信者が追加の信号チャネルを閉じて、送信者にこれ以上データを送信しないよう指示することができます。。
- package main
-
- import (
- "log"
- "sync"
- )
-
- func main() {
-
- cosnt N := 5
- cosnt Max := 60000
- count := 0
-
- dataCh := make(chan int)
- stopCh := make(chan bool)
-
- var wt sync.WaitGroup
- wt.Add(1)
-
- //发送者
- for i := 0; i < N; i++ {
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- count += 1
- dataCh <- count
- }
- }
- }()
- }
-
- //接收者
- go func() {
- defer wt.Done()
- for value := range dataCh {
- if value == Max {
- // 此唯一的接收者同时也是stopCh通道的
- // 唯一发送者。尽管它不能安全地关闭dataCh数
- // 据通道,但它可以安全地关闭stopCh通道。
- close(stopCh)
- return
- }
- log.Println(value)
- }
- }()
-
- wt.Wait()
- }
このメソッドでは、追加の信号チャネル stopCh を追加します。これは、受信側がデータを受信する必要がなくなったことを送信側に伝えるために使用されます。さらに、このメソッドは dataCh を閉じません。チャネルがコルーチンによって使用されなくなった場合、チャネルが閉じられているかどうかに関係なく、徐々にガベージ コレクションが行われます。
この方法の優れた点は、1 つのチャネルを閉じることで別のチャネルの使用を停止し、それによって間接的に他のチャネルも閉じることになることです。
受信者または送信者のいずれかがデータの送信に使用されるチャネルを閉じることはできません。また、複数の受信者の 1 つが追加のシグナリング チャネルを閉じることもできません。これらの実践はどちらもチャネル閉鎖原則に違反します。
ただし、ご紹介できるのは、中間調停者の役割と、すべての受信者と送信者に作業の終了を通知する追加の信号チャネルを閉じさせる。
コード例:
- package main
-
- import (
- "log"
- "math/rand"
- "strconv"
- "sync"
- )
-
- func main() {
-
- const Max = 100000
- const NumReceivers = 10
- const NumSenders = 1000
-
- var wt sync.WaitGroup
- wt.Add(NumReceivers)
-
- dataCh := make(chan int)
- stopCh := make(chan struct{})
- // stopCh是一个额外的信号通道。它的发送
- // 者为中间调解者。它的接收者为dataCh
- // 数据通道的所有的发送者和接收者。
- toStop := make(chan string, 1)
- // toStop是一个用来通知中间调解者让其
- // 关闭信号通道stopCh的第二个信号通道。
- // 此第二个信号通道的发送者为dataCh数据
- // 通道的所有的发送者和接收者,它的接收者
- // 为中间调解者。它必须为一个缓冲通道。
-
- var stoppedBy string
-
- // 中间调解者
- go func() {
- stoppedBy = <-toStop
- close(stopCh)
- }()
-
- // 发送者
- for i := 0; i < NumSenders; i++ {
- go func(id string) {
- for {
- value := rand.Intn(Max)
- if value == 0 {
- // 为了防止阻塞,这里使用了一个尝试
- // 发送操作来向中间调解者发送信号。
- select {
- case toStop <- "发送者#" + id:
- default:
- }
- return
- }
-
- select {
- case <-stopCh:
- return
- case dataCh <- value:
- }
- }
- }(strconv.Itoa(i))
- }
-
- // 接收者
- for i := 0; i < NumReceivers; i++ {
- go func(id string) {
- defer wt.Done()
-
- for {
- select {
- case <-stopCh:
- return
- case value := <-dataCh:
- if value == Max {
- // 为了防止阻塞,这里使用了一个尝试
- // 发送操作来向中间调解者发送信号。
- select {
- case toStop <- "接收者:" + id:
- default:
- }
- return
- }
-
- log.Println(value)
- }
- }
- }(strconv.Itoa(i))
- }
-
- wt.Wait()
- log.Println("被" + stoppedBy + "终止了")
-
- }