2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Eine Coroutine ist ein leichter Thread auf Benutzerebene, der über einen unabhängigen Stapelspeicherplatz verfügt und den Heapspeicherplatz des Programms gemeinsam nutzt.
Es handelt sich um einen Mikro-Thread, der durch Algorithmen auf der Basis eines einzelnen Threads implementiert wird. Im Vergleich zur Multi-Thread-Programmierung bietet er die folgenden Vorteile:
Kanal ist eine Datenstruktur, die für die Kommunikation zwischen Coroutinen verwendet wird. Ähnlich wie bei einer Warteschlange ist ein Ende der Sender und das andere Ende der Empfänger. Durch die Verwendung von Kanälen kann die Synchronisierung und Reihenfolge der Daten sichergestellt werden.
Kanäle werden in gepufferte Kanäle und ungepufferte Kanäle unterteilt, die wie folgt deklariert werden:
intChan := make(chan int,<缓冲容量>)
intChan := make(chan int)
Der Unterschied zwischen gepufferten Kanälen und ungepufferten Kanälen:
Bei der Implementierung ungepufferter Kanäle ist zu beachten, dass an beiden Enden des Kanals ein Sender und ein Empfänger vorhanden sein müssen, da es sonst zu einem Deadlock kommt.
Bedeutung der Frage: Verwenden Sie den Coroutine-Kanal, um abwechselnd die Zahlen 1-10 und die Buchstaben AJ zu drucken.
Code:
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- /*
- 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
- 解决办法:
- 避免死锁的发生:
- 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
- */
-
- func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
- defer wg.Done()
-
- for i := 1; i <= 10; i++ {
- <-alpCh // 等待字母goroutine发信号
- fmt.Print(i, " ")
- //避免死锁发生
- if i < 10 {
- numCh <- struct{}{} // 发信号给字母goroutine
- }
- if i == 10 {
- close(numCh)
- }
- }
-
- }
-
- func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
- defer wg.Done()
-
- for i := 'A'; i <= 'J'; i++ {
- <-numCh // 等待数字goroutine发信号
- fmt.Printf("%c", i)
- alpCh <- struct{}{} // 发信号给数字goroutine
- }
- close(alpCh)
- }
-
- func main() {
- numCh := make(chan struct{}) // 用于数字goroutine的信号通道
- alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
- var wg sync.WaitGroup
-
- wg.Add(2)
-
- go printAlp(&wg, numCh, alpCh)
- go printNum(&wg, numCh, alpCh)
-
- // 启动时先给数字goroutine发送一个信号
- numCh <- struct{}{}
-
- wg.Wait()
-
- }
Themenanalyse:
Die Frage erfordert, dass wir abwechselnd Buchstaben und Zahlen drucken, daher müssen wir die strikte Reihenfolge der beiden Coroutinen sicherstellen, die mit dem Anwendungsszenario ungepufferter Kanäle übereinstimmt. Richten Sie zwei Kanäle zum Speichern von Zahlen bzw. Buchstaben ein, die als Sender und Empfänger der beiden Kanäle dienen. Einmal in einer Schleife drucken und einmal ein Signal senden, um eine andere Coroutine an das Drucken zu erinnern.
Es ist zu beachten, dass beim Drucken des letzten Zeichens „10“ die Coroutine zum Drucken von Buchstaben beendet ist und der numCh-Kanal keinen Empfänger mehr hat. Zu diesem Zeitpunkt sind die Implementierungsbedingungen des ungepufferten Kanals nicht mehr erfüllt – sie müssen vorhanden sein Andernfalls führt das erneute Senden des Signals zu einem blockierenden Deadlock. Es ist also nicht erforderlich, das Signal zum zehnten Mal erneut zu senden.
Titel: Entwerfen Sie einen Aufgabenplaner, der das Multi-Coroutine + Kanal-Programmiermodell verwendet, um Geschäftsszenarien für die gleichzeitige Verarbeitung mehrerer Aufgaben zu implementieren, und erfordert, dass die Planungsreihenfolge der Reihenfolge entspricht, in der Aufgaben hinzugefügt werden.
Code:
- type scheduler struct {
- taskChan chan func()
- wt sync.WaitGroup
- }
-
- func (td *scheduler) AddTask(task func()) {
- td.taskChan <- task
- }
-
- func (td *scheduler) Executer() {
- defer td.wt.Done()
- for {
- task, ok := <-td.taskChan
- task()
- if ok && len(td.taskChan) == 0 {
- break
- }
- }
- }
-
- func (td *scheduler) Start() {
- td.wt.Add(4)
- //假设四个消费者
- for i := 0; i < 4; i++ {
- go td.Executer()
- }
-
- td.wt.Wait()
- }
-
- func main() {
- sd := scheduler{
- taskChan: make(chan func(), 5),
- }
-
- go func() {
- sd.AddTask(func() {
- fmt.Println("任务1")
- })
- sd.AddTask(func() {
- fmt.Println("任务2")
- })
- sd.AddTask(func() {
- fmt.Println("任务3")
- })
- sd.AddTask(func() {
- fmt.Println("任务4")
- })
- sd.AddTask(func() {
- fmt.Println("任务5")
- })
- sd.AddTask(func() {
- fmt.Println("任务6")
- })
- close(sd.taskChan)
- }()
-
- sd.Start()
-
- }
Problemanalyse:
Da es sich bei den hinzugefügten Aufgaben um Multitasks handelt, gibt es mehr als eine, und zur Ausführung dieser Aufgaben ist eine asynchrone Verarbeitung erforderlich. Die Einhaltung gepufferter Kanäle erfordert einen verbesserten Durchsatz und eine asynchrone Verarbeitung.
Dann müssen wir die Aufgabe in den Kanal stellen, und mehrere Empfänger können die Aufgaben aus dem Kanal der Reihe nach übernehmen und ausführen.
Das zu beachtende Problem besteht darin, dass es beim Hinzufügen von Aufgaben zu Blockaden kommt, wenn die Anzahl der hinzugefügten Aufgaben größer als der Puffer des Kanals ist. Um den normalen Start des Verbrauchers nicht zu beeinträchtigen, muss er eine separate Coroutine öffnen, um Aufgaben hinzuzufügen.
Auf diese Weise wird der blockierende Produzent aktiviert, wenn der Verbraucher konsumiert, um weiterhin Aufgaben hinzuzufügen.
Nachdem wir das Coroutine + Channel-Programmiermodell studiert haben, sollten wir zusätzlich zu dem, was gerade im Titel erwähnt wurde, auch auf die folgenden Punkte achten:
Zunächst einmal besteht das grundlegendste Prinzip beim Schließen von Kanälen darin, geschlossene Kanäle nicht zu schließen. Zweitens gibt es ein weiteres Prinzip für die Nutzung von Go-Kanälen:Schließen Sie den Kanal nicht am Datenempfänger oder wenn mehrere Sender vorhanden sind.mit anderen Worten,Wir sollten nur zulassen, dass der einzige Absender eines Kanals diesen Kanal schließt.
Eine unhöfliche Möglichkeit besteht darin, den Kanal durch Ausnahmewiederherstellung zu schließen, aber dies verstößt offensichtlich gegen die oben genannten Grundsätze und kann zu Datenkonkurrenz führen. Eine andere Möglichkeit besteht darin, den Kanal mit sync.Once oder sync.Mutex zu schließen, was nicht garantiert ist Vorgänge und Sendevorgänge auf einem Kanal erzeugen keine Datenrennen. Beide Methoden weisen bestimmte Probleme auf, daher werde ich sie nicht im Detail vorstellen. Hier ist eine Methode zum ordnungsgemäßen Schließen des Kanals.
Eine der am einfachsten zu bewältigenden Situationen. Wenn der Absender mit dem Senden fertig sein muss, lassen Sie ihn einfach den Kanal schließen. Dies ist in den beiden obigen Programmierbeispielen der Fall.
Gemäß den Grundprinzipien von Go-Kanälen können wir den Kanal nur beim einzigen Absender des Kanals schließen. In diesem Fall können wir den Kanal also nicht direkt irgendwo schließen.Aber wir können den Empfänger einen zusätzlichen Signalkanal schließen lassen, um dem Sender mitzuteilen, keine weiteren Daten zu senden.。
- package main
-
- import (
- "log"
- "sync"
- )
-
- func main() {
-
- cosnt N := 5
- cosnt Max := 60000
- count := 0
-
- dataCh := make(chan int)
- stopCh := make(chan bool)
-
- var wt sync.WaitGroup
- wt.Add(1)
-
- //发送者
- for i := 0; i < N; i++ {
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- count += 1
- dataCh <- count
- }
- }
- }()
- }
-
- //接收者
- go func() {
- defer wt.Done()
- for value := range dataCh {
- if value == Max {
- // 此唯一的接收者同时也是stopCh通道的
- // 唯一发送者。尽管它不能安全地关闭dataCh数
- // 据通道,但它可以安全地关闭stopCh通道。
- close(stopCh)
- return
- }
- log.Println(value)
- }
- }()
-
- wt.Wait()
- }
Bei dieser Methode fügen wir einen zusätzlichen Signalkanal stopCh hinzu, mit dem der Empfänger dem Sender mitteilt, dass er keine Daten mehr empfangen muss. Darüber hinaus schließt diese Methode dataCh nicht. Wenn ein Kanal nicht mehr von einer Coroutine verwendet wird, wird er nach und nach durch Müll gesammelt, unabhängig davon, ob er geschlossen wurde.
Die Eleganz dieser Methode besteht darin, dass Sie durch das Schließen eines Kanals die Nutzung eines anderen Kanals beenden und dadurch indirekt den anderen Kanal schließen.
Wir können weder den Empfänger noch den Sender veranlassen, den zur Übertragung der Daten verwendeten Kanal zu schließen, und wir können auch nicht verlangen, dass einer der mehreren Empfänger einen zusätzlichen Signalisierungskanal schließt. Beide Praktiken verstoßen gegen das Prinzip der Kanalschließung.
Wir können es jedoch vorstellenEine Zwischenvermittlerrolle und die Schließung zusätzlicher Signalkanäle, um alle Empfänger und Sender über das Ende der Arbeit zu informieren。
Codebeispiel:
- package main
-
- import (
- "log"
- "math/rand"
- "strconv"
- "sync"
- )
-
- func main() {
-
- const Max = 100000
- const NumReceivers = 10
- const NumSenders = 1000
-
- var wt sync.WaitGroup
- wt.Add(NumReceivers)
-
- dataCh := make(chan int)
- stopCh := make(chan struct{})
- // stopCh是一个额外的信号通道。它的发送
- // 者为中间调解者。它的接收者为dataCh
- // 数据通道的所有的发送者和接收者。
- toStop := make(chan string, 1)
- // toStop是一个用来通知中间调解者让其
- // 关闭信号通道stopCh的第二个信号通道。
- // 此第二个信号通道的发送者为dataCh数据
- // 通道的所有的发送者和接收者,它的接收者
- // 为中间调解者。它必须为一个缓冲通道。
-
- var stoppedBy string
-
- // 中间调解者
- go func() {
- stoppedBy = <-toStop
- close(stopCh)
- }()
-
- // 发送者
- for i := 0; i < NumSenders; i++ {
- go func(id string) {
- for {
- value := rand.Intn(Max)
- if value == 0 {
- // 为了防止阻塞,这里使用了一个尝试
- // 发送操作来向中间调解者发送信号。
- select {
- case toStop <- "发送者#" + id:
- default:
- }
- return
- }
-
- select {
- case <-stopCh:
- return
- case dataCh <- value:
- }
- }
- }(strconv.Itoa(i))
- }
-
- // 接收者
- for i := 0; i < NumReceivers; i++ {
- go func(id string) {
- defer wt.Done()
-
- for {
- select {
- case <-stopCh:
- return
- case value := <-dataCh:
- if value == Max {
- // 为了防止阻塞,这里使用了一个尝试
- // 发送操作来向中间调解者发送信号。
- select {
- case toStop <- "接收者:" + id:
- default:
- }
- return
- }
-
- log.Println(value)
- }
- }
- }(strconv.Itoa(i))
- }
-
- wt.Wait()
- log.Println("被" + stoppedBy + "终止了")
-
- }