Condivisione della tecnologia

Problemi applicativi completi delle coroutine e dei canali Go

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Comprendi brevemente cosa sono le coroutine e i canali

Cos'è la coroutine

Una coroutine è un thread leggero a livello utente che ha uno spazio stack indipendente e condivide lo spazio heap del programma.

È un micro-thread implementato tramite algoritmi sulla base del single thread. Rispetto alla programmazione multi-thread presenta i seguenti vantaggi:

  • Il cambio di contesto della coroutine viene deciso dall'utente, senza la necessità di cambiare contesto del kernel del sistema, riducendo il sovraccarico.
  • Per impostazione predefinita, le coroutine sono completamente protette per evitare interruzioni.Non è richiesto alcun blocco del funzionamento atomico
  • Un singolo thread può anche raggiungere un'elevata concorrenza e persino una CPU single-core può supportare decine di migliaia di coroutine.

cos'è il canale

Il canale è una struttura dati utilizzata per la comunicazione tra coroutine. Simile a una coda, un'estremità è il mittente e l'altra estremità è il destinatario. L'utilizzo dei canali può garantire la sincronizzazione e l'ordine dei dati.

I canali sono divisi in canali con buffer e canali senza buffer, che sono dichiarati come segue:

  • C'è un canale buffer
intChan := make(chan int,<缓冲容量>)
  • canale senza buffer
intChan := make(chan int)

La differenza tra canali bufferizzati e canali senza buffer:

  • Blocco: il mittente di un canale senza buffer si bloccherà finché i dati non verranno ricevuti; il mittente di un canale con buffer si bloccherà finché il buffer non sarà pieno e il destinatario si bloccherà finché il buffer non sarà vuoto.
  • Sincronizzazione e sequenza dei dati: i canali senza buffer garantiscono la sincronizzazione e la sequenza dei dati; le pipe bufferizzate non garantiscono la sincronizzazione e la sequenza dei dati.
  • Scenari applicativi: i canali senza buffer richiedono una sincronizzazione e una sequenza rigorose; i canali con buffer possono comunicare in modo asincrono e migliorare la velocità effettiva.

Ciò che è necessario notare nell'implementazione dei canali senza buffer è che devono esserci un mittente e un destinatario su entrambe le estremità del canale, altrimenti si verificherà una situazione di stallo.

2. Caso di programmazione simultanea di canali coroutine

(1) Stampa alternativamente lettere e numeri

Significato della domanda: utilizzare il canale coroutine per stampare alternativamente i numeri 1-10 e le lettere AJ.

Codice:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

Analisi dell'argomento:

La domanda richiede di stampare alternativamente lettere e numeri, quindi dobbiamo garantire l'ordine rigoroso delle due coroutine, che sia coerente con lo scenario applicativo dei canali senza buffer. Imposta due canali per memorizzare rispettivamente numeri e lettere. Due coroutine che stampano numeri e lettere fungono rispettivamente da mittente e destinatario. Stampa una volta in un ciclo e invia un segnale una volta per ricordare a un'altra coroutine di stampare.

Va notato che quando viene stampato l'ultimo carattere '10', la coroutine per la stampa delle lettere è terminata e il canale numCh non ha alcun ricevitore. A questo punto, le condizioni di implementazione del canale senza buffer non sono più soddisfatte - devono esserci un mittente e un destinatario In caso contrario, l'invio successivo del segnale causerà un deadlock bloccante. Quindi non è necessario inviare nuovamente il segnale per la decima volta.

(2) Progettare un pianificatore di attività

Titolo: Progettare un'utilità di pianificazione delle attività che utilizza il modello di programmazione multi-coroutine + canale per implementare scenari aziendali di elaborazione simultanea di più attività e richiede che l'ordine di pianificazione sia nell'ordine in cui vengono aggiunte le attività.

Codice:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

Analisi del problema:

Poiché le attività aggiunte sono multi-task, ce n'è più di una e per eseguire queste attività è necessaria l'elaborazione asincrona. La conformità ai canali bufferizzati richiede un throughput migliorato e un'elaborazione asincrona.

Quindi, dobbiamo inserire l'attività nel canale e più ricevitori possono prendere le attività dal canale in ordine ed eseguirle.

Il problema che richiede attenzione è che se il numero di attività aggiunte è maggiore del buffer del canale, ciò causerà un blocco durante l'aggiunta di attività. Per non influenzare il normale avvio del consumer, è necessario aprire una coroutine separata per aggiungere attività.

In questo modo, quando il consumatore consuma, il produttore bloccante verrà risvegliato per continuare ad aggiungere compiti.

3. Riepilogo

Dopo aver studiato il modello di programmazione coroutine + canale, oltre a quanto appena accennato nel titolo, dovremmo prestare attenzione anche alle seguenti questioni:

1. Perché il canale dovrebbe essere chiuso una volta esaurito. Quali sono i rischi se non lo si chiude?

  • Per evitare lo stallo. La chiusura del canale indica inoltre al destinatario che non ci sono più dati da inviare dal mittente e non è necessario continuare ad attendere i dati. Dopo aver ricevuto l'informazione di chiusura del canale, il ricevitore smette di ricevere i dati; se il canale non viene chiuso, il ricevitore rimarrà bloccato e c'è il rischio di stallo.
  • Liberare risorse ed evitare perdite di risorse. Dopo aver chiuso il canale, il sistema rilascerà le risorse corrispondenti. La chiusura del canale in tempo può evitare sprechi e perdite di risorse.

2. Come chiudere il canale con garbo?

Innanzitutto, il principio fondamentale per chiudere i canali è non chiudere i canali che sono stati chiusi. In secondo luogo, esiste un altro principio per l'utilizzo dei canali Go:Non chiudere il canale sul ricevitore dati o quando sono presenti più mittenti.in altre paroleDovremmo consentire solo all'unico mittente di un canale di chiudere questo canale.

Un modo scortese è chiudere il canale tramite il ripristino dell'eccezione, ma ciò ovviamente viola i principi di cui sopra e può causare una corsa ai dati. Un altro modo è chiudere il canale con sync.Once o sync.Mutex, cosa che non è garantita che avvenga le operazioni e le operazioni di invio su un canale non creano gare di dati. Entrambi i metodi presentano alcuni problemi, quindi non li presenterò in dettaglio. Ecco un metodo su come chiudere con grazia il canale.

Scenario 1: M ricevitori e un mittente

Una delle situazioni più facili da affrontare. Quando il mittente deve terminare l'invio, lascia che chiuda il canale. Questo è il caso dei due esempi di programmazione sopra.

Scenario 2: un destinatario e N mittenti

Secondo i principi di base dei canali Go, possiamo chiudere il canale solo presso l'unico mittente del canale. Quindi, in questo caso, non possiamo chiudere direttamente il canale da qualche parte.Ma possiamo lasciare che il ricevitore chiuda un canale di segnale aggiuntivo per dire al mittente di non inviare altri dati.

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

In questo metodo aggiungiamo un ulteriore canale di segnale stopCh, che il ricevitore utilizza per comunicare al mittente che non ha più bisogno di ricevere dati. Inoltre, questo metodo non chiude dataCh Quando un canale non viene più utilizzato da nessuna coroutine, verrà gradualmente sottoposto a garbage collection, indipendentemente dal fatto che sia stato chiuso.

L'eleganza di questo metodo è che chiudendo un canale si smette di utilizzare un altro canale, chiudendo così indirettamente l'altro canale.

Scenario 3: M ricevitori e N mittenti

Non possiamo fare in modo che né il ricevitore né il mittente chiudano il canale utilizzato per trasmettere i dati, né possiamo fare in modo che uno dei molteplici ricevitori chiuda un canale di segnalazione aggiuntivo. Entrambe queste pratiche violano il principio di chiusura del canale.

Possiamo però presentarciUn ruolo di mediatore intermedio e la possibilità di chiudere ulteriori canali di segnalazione per avvisare tutti i destinatari e i mittenti della fine dei lavori

Esempio di codice:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }