Compartir tecnología

Problemas completos de aplicación de corrutinas y canales de Go

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Comprenda brevemente qué son las corrutinas y los canales.

¿Qué es la corrutina?

Una corrutina es un subproceso liviano a nivel de usuario que tiene un espacio de pila independiente y comparte el espacio de pila del programa.

Es un microhilo implementado mediante algoritmos basados ​​en un solo subproceso. En comparación con la programación multiproceso, tiene las siguientes ventajas:

  • El cambio de contexto de la corrutina lo decide el usuario, sin necesidad de cambiar de contexto del núcleo del sistema, lo que reduce la sobrecarga.
  • De forma predeterminada, las corrutinas están completamente protegidas para evitar interrupciones.No se requiere bloqueo de operación atómica
  • Un solo subproceso también puede lograr una alta concurrencia, e incluso una CPU de un solo núcleo puede admitir decenas de miles de corrutinas.

¿Qué es el canal?

El canal es una estructura de datos utilizada para la comunicación entre corrutinas. Similar a una cola, un extremo es el remitente y el otro extremo es el receptor. El uso de canales puede garantizar la sincronización y el orden de los datos.

Los canales se dividen en canales con búfer y canales sin búfer, que se declaran de la siguiente manera:

  • Hay un canal de buffer
intChan := make(chan int,<缓冲容量>)
  • canal sin búfer
intChan := make(chan int)

La diferencia entre canales con búfer y canales sin búfer:

  • Bloqueo: el remitente de un canal sin búfer se bloqueará hasta que se reciban los datos; el remitente de un canal con búfer se bloqueará hasta que el búfer esté lleno y el receptor se bloqueará hasta que el búfer no esté vacío.
  • Sincronización y secuencia de datos: los canales sin búfer garantizan la sincronización y secuencia de datos; las tuberías con búfer no garantizan la sincronización y secuencia de datos.
  • Escenarios de aplicación: los canales sin búfer requieren una sincronización estricta y los canales con búfer pueden comunicarse de forma asíncrona y mejorar el rendimiento;

Lo que hay que tener en cuenta en la implementación de canales sin búfer es que debe haber un remitente y un receptor en ambos extremos del canal; de lo contrario, se producirá un punto muerto.

2. Caso de programación concurrente de canal de rutina

(1) Imprima letras y números alternativamente

Significado de la pregunta: utilice el canal de rutina para imprimir alternativamente los números del 1 al 10 y las letras AJ.

Código:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

Análisis del tema:

La pregunta requiere que imprimamos letras y números alternativamente, por lo que debemos garantizar el orden estricto de las dos corrutinas, que es consistente con el escenario de aplicación de canales sin búfer. Configure dos canales para almacenar números y letras respectivamente. Dos corrutinas que imprimen números y letras sirven como remitente y receptor de los dos canales respectivamente. Imprima una vez en un bucle y envíe una señal una vez para recordarle a otra rutina que imprima.

Cabe señalar que cuando se imprime el último carácter '10', la rutina para imprimir letras ha finalizado y el canal numCh no tiene receptor. En este momento, las condiciones de implementación del canal sin búfer ya no se cumplen; debe haberlas. un remitente y un receptor, de lo contrario, enviar la señal nuevamente provocará un punto muerto de bloqueo. Por lo tanto, no es necesario volver a enviar la señal por décima vez.

(2) Diseñar un programador de tareas

Título: Diseñe un programador de tareas que utilice el modelo de programación de múltiples corrutinas + canales para implementar escenarios comerciales para el procesamiento simultáneo de múltiples tareas, y el orden de programación debe estar en el orden en que se agregan las tareas.

Código:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

análisis del problema:

Dado que las tareas agregadas son múltiples, hay más de una y se requiere procesamiento asincrónico para ejecutar estas tareas. Cumplir con los canales almacenados en búfer requiere un rendimiento mejorado y un procesamiento asincrónico.

Luego, debemos colocar la tarea en el canal, y varios receptores pueden tomar las tareas del canal en orden y ejecutarlas.

Una cosa a tener en cuenta es que si la cantidad de tareas agregadas es mayor que el búfer del canal, se bloqueará al agregar tareas. Para no afectar el inicio normal del consumidor, es necesario abrir una rutina separada para agregar tareas.

De esta forma, cuando el consumidor consuma, el productor bloqueador se despertará para continuar agregando tareas.

3. Resumen

Después de estudiar el modelo de programación de canales corrutina +, además de lo que se acaba de mencionar en el título, también debemos prestar atención a las siguientes cuestiones:

1. ¿Por qué debería cerrarse el canal una vez agotado? ¿Cuáles son los riesgos de no cerrarlo?

  • Para evitar un punto muerto. Cerrar el canal también le dice al receptor que no hay más datos para enviar desde el remitente y que no hay necesidad de seguir esperando datos. Después de recibir la información de cierre del canal, el receptor deja de recibir datos; si el canal no está cerrado, el receptor permanecerá bloqueado y existe riesgo de bloqueo.
  • Libere recursos y evite fugas de recursos. Después de cerrar el canal, el sistema liberará los recursos correspondientes. Cerrar el canal a tiempo puede evitar el desperdicio y la fuga de recursos.

2. ¿Cómo cerrar el canal con gracia?

En primer lugar, el principio más básico para cerrar canales es no cerrar canales que ya se han cerrado. En segundo lugar, existe otro principio para utilizar canales Go:No cierre el canal en el receptor de datos o cuando haya varios remitentes.en otras palabrasSólo debemos permitir que el único remitente de un canal cierre este canal.

Una forma grosera es cerrar el canal mediante la recuperación de excepciones, pero esto obviamente viola los principios anteriores y puede causar una carrera de datos. Otra forma es cerrar el canal con sync.Once o sync.Mutex, lo cual no está garantizado que suceda. Las operaciones y las operaciones de envío en un canal no crean carreras de datos. Ambos métodos tienen ciertos problemas, por lo que no los presentaré en detalle. Aquí hay un método sobre cómo cerrar el canal correctamente.

Escenario 1: M receptores y un remitente

Una de las situaciones más fáciles de afrontar. Cuando el remitente necesite terminar de enviar, simplemente déjelo cerrar el canal. Este es el caso de los dos ejemplos de programación anteriores.

Escenario 2: un receptor y N remitentes

De acuerdo con los principios básicos de los canales Go, solo podemos cerrar el canal en el único remitente del canal. Entonces, en este caso, no podemos cerrar el canal directamente en alguna parte.Pero podemos dejar que el receptor cierre un canal de señal adicional para indicarle al remitente que no envíe más datos.

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

En este método, agregamos un canal de señal adicional stopCh, que el receptor usa para decirle al remitente que ya no necesita recibir datos. Además, este método no cierra dataCh. Cuando una rutina ya no utiliza un canal, se recolectará basura gradualmente, independientemente de si se ha cerrado.

La elegancia de este método es que al cerrar un canal, dejas de usar otro canal, cerrando así indirectamente el otro canal.

Escenario 3: M receptores y N remitentes

No podemos hacer que ni el receptor ni el emisor cierren el canal utilizado para transmitir los datos, ni podemos hacer que uno de los múltiples receptores cierre un canal de señalización adicional. Ambas prácticas violan el principio de cierre de canales.

Sin embargo, podemos introducirUn papel de mediador intermedio y que cierre canales de señalización adicionales para avisar a todos los receptores y remitentes del final del trabajo.

Ejemplo de código:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }