Partage de technologie

Problèmes d'application complets des coroutines et des canaux Go

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Comprendre brièvement ce que sont les coroutines et les canaux

Qu'est-ce que la coroutine

Une coroutine est un thread léger au niveau utilisateur qui dispose d'un espace de pile indépendant et partage l'espace de tas du programme.

Il s'agit d'un micro-thread implémenté via des algorithmes basés sur un seul thread. Par rapport à la programmation multi-thread, il présente les avantages suivants :

  • Le changement de contexte de la coroutine est décidé par l'utilisateur, sans qu'il soit nécessaire de changer de contexte du noyau système, ce qui réduit la surcharge.
  • Par défaut, les coroutines sont entièrement protégées pour éviter les interruptions.Aucun verrouillage de fonctionnement atomique requis
  • Un seul thread peut également atteindre une concurrence élevée, et même un processeur monocœur peut prendre en charge des dizaines de milliers de coroutines.

qu'est-ce que la chaîne

Le canal est une structure de données utilisée pour la communication entre les coroutines. Semblable à une file d’attente, une extrémité est l’expéditeur et l’autre extrémité est le destinataire. L'utilisation de canaux peut garantir la synchronisation et l'ordre des données.

Les canaux sont divisés en canaux tamponnés et canaux non tamponnés, déclarés comme suit :

  • Il y a un canal tampon
intChan := make(chan int,<缓冲容量>)
  • canal sans tampon
intChan := make(chan int)

La différence entre les canaux tamponnés et les canaux non tamponnés :

  • Blocage : l'expéditeur d'un canal sans tampon bloquera jusqu'à ce que les données soient reçues ; l'expéditeur d'un canal avec tampon bloquera jusqu'à ce que le tampon soit plein, et le récepteur bloquera jusqu'à ce que le tampon ne soit pas vide.
  • Synchronisation et séquence des données : les canaux sans tampon garantissent la synchronisation et la séquence des données ; les canaux mis en mémoire tampon ne garantissent pas la synchronisation et la séquence des données.
  • Scénarios d'application : les canaux sans tampon nécessitent une synchronisation et une séquence strictes ; les canaux avec tampon peuvent communiquer de manière asynchrone et améliorer le débit.

Ce qu'il faut noter dans la mise en œuvre de canaux sans tampon, c'est qu'il doit y avoir un émetteur et un récepteur aux deux extrémités du canal, sinon un blocage se produira.

2. Cas de programmation simultanée par canal Coroutine

(1) Imprimez alternativement les lettres et les chiffres

Signification de la question : utilisez le canal coroutine pour imprimer alternativement les chiffres 1 à 10 et les lettres AJ.

Code:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

Analyse du sujet :

La question nous oblige à imprimer alternativement des lettres et des chiffres, nous devons donc garantir l'ordre strict des deux coroutines, ce qui est cohérent avec le scénario d'application des canaux sans tampon. Configurez deux canaux pour stocker respectivement des chiffres et des lettres. Deux coroutines qui impriment des chiffres et des lettres servent respectivement d'expéditeur et de récepteur des deux canaux. Imprimez une fois en boucle et envoyez un signal une fois pour rappeler à une autre coroutine d'imprimer.

Il convient de noter que lorsque le dernier caractère « 10 » est imprimé, la coroutine d'impression des lettres est terminée et le canal numCh n'a pas de récepteur. A ce moment, les conditions de mise en œuvre du canal non tamponné ne sont plus remplies - il doit y en avoir. un émetteur et un récepteur, sinon l'envoi à nouveau du signal entraînera un blocage. Il n’est donc pas nécessaire de renvoyer le signal une dixième fois.

(2) Concevoir un planificateur de tâches

Titre : Concevoir un planificateur de tâches qui utilise le modèle de programmation multi-coroutine + canal pour mettre en œuvre des scénarios commerciaux de traitement simultané de multi-tâches, et nécessite que l'ordre de planification soit dans l'ordre dans lequel les tâches sont ajoutées.

Code:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

analyse du problème:

Étant donné que les tâches ajoutées sont multitâches, il y en a plusieurs et un traitement asynchrone est requis pour exécuter ces tâches. Le respect des canaux mis en mémoire tampon nécessite un débit amélioré et un traitement asynchrone.

Ensuite, nous devons placer la tâche dans le canal, et plusieurs récepteurs peuvent prendre les tâches du canal dans l'ordre et les exécuter.

Le problème auquel il faut prêter attention est que si le nombre de tâches ajoutées est supérieur au tampon du canal, cela provoquera un blocage lors de l'ajout de tâches. Afin de ne pas affecter le démarrage normal du consommateur, celui-ci doit ouvrir une coroutine distincte pour ajouter des tâches.

De cette façon, lorsque le consommateur consommera, le producteur bloquant sera réveillé pour continuer à ajouter des tâches.

3. Résumé

Après avoir étudié le modèle de programmation coroutine + canal, en plus de ce qui vient d'être mentionné dans le titre, nous devons également prêter attention aux problèmes suivants :

1. Pourquoi le canal devrait-il être fermé une fois épuisé ? Quels sont les risques de ne pas le fermer ?

  • Pour éviter une impasse. La fermeture du canal indique également au récepteur qu'il n'y a plus de données à envoyer de la part de l'expéditeur et qu'il n'est pas nécessaire de continuer à attendre les données. Après avoir reçu l'information de fermeture du canal, le récepteur cesse de recevoir des données ; si le canal n'est pas fermé, le récepteur restera bloqué et il existe un risque de blocage.
  • Libérez des ressources et évitez les fuites de ressources. Après avoir fermé le canal, le système libérera les ressources correspondantes. La fermeture du canal à temps peut éviter le gaspillage et les fuites de ressources.

2. Comment fermer la chaîne en douceur ?

Tout d’abord, le principe le plus fondamental de la fermeture des chaînes est de ne pas fermer les chaînes qui ont été fermées. Deuxièmement, il existe un autre principe d'utilisation des canaux Go :Ne fermez pas le canal sur le récepteur de données ou lorsqu'il y a plusieurs expéditeurs.autrement ditNous devrions uniquement laisser le seul expéditeur d'un canal fermer ce canal.

Une méthode grossière consiste à fermer le canal via une récupération d'exception, mais cela viole évidemment les principes ci-dessus et peut provoquer une concurrence en matière de données. Une autre méthode consiste à fermer le canal avec sync.Once ou sync.Mutex, ce qui n'est pas garanti. les opérations et les opérations d'envoi sur un canal ne créent pas de courses de données. Les deux méthodes présentent certains problèmes, je ne les présenterai donc pas en détail. Voici une méthode permettant de fermer le canal en douceur.

Scénario 1 : M récepteurs et un expéditeur

Une des situations les plus faciles à gérer. Lorsque l'expéditeur doit terminer l'envoi, laissez-le simplement fermer le canal. C'est le cas dans les deux exemples de programmation ci-dessus.

Scénario 2 : un récepteur et N expéditeurs

Selon les principes de base des chaînes Go, nous ne pouvons fermer la chaîne qu'au niveau du seul émetteur de la chaîne. Donc, dans ce cas, nous ne pouvons pas fermer directement le canal quelque part.Mais nous pouvons laisser le récepteur fermer un canal de signal supplémentaire pour indiquer à l'expéditeur de ne plus envoyer de données.

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

Dans cette méthode, nous ajoutons un canal de signal supplémentaire stopCh, que le récepteur utilise pour indiquer à l'expéditeur qu'il n'a plus besoin de recevoir de données. De plus, cette méthode ne ferme pas dataCh Lorsqu'un canal n'est plus utilisé par aucune coroutine, il sera progressivement récupéré, qu'il ait été fermé ou non.

L'élégance de cette méthode est qu'en fermant un canal, vous arrêtez d'utiliser un autre canal, fermant ainsi indirectement l'autre canal.

Scénario 3 : M récepteurs et N émetteurs

Nous ne pouvons pas demander au récepteur ou à l'expéditeur de fermer le canal utilisé pour transmettre les données, ni à l'un des multiples récepteurs de fermer un canal de signalisation supplémentaire. Ces deux pratiques violent le principe de fermeture des canaux.

On peut cependant introduireUn rôle de médiateur intermédiaire et lui faire fermer des canaux de signalisation supplémentaires pour notifier tous les destinataires et expéditeurs de la fin des travaux

Exemple de code :

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }