Compartilhamento de tecnologia

Problemas abrangentes de aplicação de corrotinas e canais Go

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Entenda resumidamente o que são corrotinas e canais

O que é co-rotina

Uma corrotina é um thread leve em nível de usuário que possui um espaço de pilha independente e compartilha o espaço de heap do programa.

É um microthread implementado por meio de algoritmos baseados em thread único. Em comparação com a programação multithread, possui as seguintes vantagens:

  • A troca de contexto da corrotina é decidida pelo usuário, sem a necessidade de troca de contexto do kernel do sistema, reduzindo a sobrecarga.
  • Por padrão, as corrotinas são totalmente protegidas para evitar interrupções.Não é necessário bloqueio de operação atômica
  • Um único thread também pode atingir alta simultaneidade, e até mesmo uma CPU de núcleo único pode suportar dezenas de milhares de corrotinas.

o que é canal

Canal é uma estrutura de dados usada para comunicação entre corrotinas. Semelhante a uma fila, uma extremidade é o remetente e a outra extremidade é o destinatário. O uso de canais pode garantir a sincronização e a ordem dos dados.

Os canais são divididos em canais com buffer e canais sem buffer, que são declarados da seguinte forma:

  • Existe um canal de buffer
intChan := make(chan int,<缓冲容量>)
  • canal sem buffer
intChan := make(chan int)

A diferença entre canais com buffer e canais sem buffer:

  • Bloqueio: O remetente de um canal sem buffer bloqueará até que os dados sejam recebidos; o remetente de um canal com buffer bloqueará até que o buffer esteja cheio e o receptor bloqueará até que o buffer não esteja vazio.
  • Sincronização e sequência de dados: canais sem buffer garantem sincronização e sequência de dados;
  • Cenários de aplicação: Canais sem buffer exigem sincronização e sequência rigorosas; canais com buffer podem se comunicar de forma assíncrona e melhorar o rendimento;

O que precisa ser observado na implementação de canais sem buffer é que deve haver um remetente e um receptor em ambas as extremidades do canal, caso contrário ocorrerá um deadlock.

2. Caso de programação simultânea de canal de rotina

(1) Imprima letras e números alternadamente

Significado da pergunta: Use o canal de co-rotina para imprimir alternadamente os números de 1 a 10 e as letras AJ.

Código:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

Análise do tópico:

A questão exige que imprimamos letras e números alternadamente, por isso precisamos garantir a ordem estrita das duas corrotinas, o que é consistente com o cenário de aplicação de canais sem buffer. Configure dois canais para armazenar números e letras, respectivamente. Duas corrotinas que imprimem números e letras servem como remetente e destinatário dos dois canais, respectivamente. Imprima uma vez em um loop e envie um sinal uma vez para lembrar outra corrotina de imprimir.

Deve-se notar que quando o último caractere '10' é impresso, a rotina de impressão de letras terminou e o canal numCh não possui receptor. Neste momento, as condições de implementação do canal sem buffer não são mais atendidas - deve haver. um remetente e um receptor, caso contrário, enviar o sinal novamente causará um impasse de bloqueio. Portanto, não há necessidade de enviar o sinal novamente pela décima vez.

(2) Projete um agendador de tarefas

Título: Projete um agendador de tarefas que use o modelo de programação multi-rotina + canal para implementar cenários de negócios de processamento simultâneo de multitarefas e exija que a ordem de agendamento esteja na ordem em que as tarefas são adicionadas.

Código:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

analise de problemas:

Como as tarefas adicionadas são multitarefas, há mais de uma e é necessário processamento assíncrono para executar essas tarefas. A conformidade com canais em buffer requer melhor rendimento e processamento assíncrono.

Então, precisamos colocar a tarefa no canal, e vários receptores podem pegar as tarefas do canal em ordem e executá-las.

O problema que precisa de atenção é que se o número de tarefas adicionadas for maior que o buffer do canal, causará bloqueio na adição de tarefas. Para não afetar a inicialização normal do consumidor, ele precisa abrir uma corrotina separada para adicionar tarefas.

Desta forma, quando o consumidor consumir, o produtor bloqueador será despertado para continuar adicionando tarefas.

3. Resumo

Depois de estudar o modelo de programação de corrotina + canal, além do que acabamos de mencionar no título, devemos também ficar atentos às seguintes questões:

1. Por que o canal deve ser fechado depois de esgotado? Quais são os riscos de não fechá-lo?

  • Para evitar impasse. Fechar o canal também informa ao receptor que não há mais dados para enviar do remetente e que não há necessidade de continuar aguardando pelos dados. Após receber a informação de fechamento do canal, o receptor para de receber dados; se o canal não for fechado, o receptor permanecerá bloqueado e há risco de deadlock;
  • Libere recursos e evite vazamentos de recursos. Após fechar o canal, o sistema liberará os recursos correspondentes. Fechar o canal a tempo pode evitar desperdício e vazamento de recursos.

2. Como fechar o canal normalmente?

Em primeiro lugar, o princípio mais básico do encerramento de canais é não fechar canais que foram fechados. Em segundo lugar, existe outro princípio para usar canais Go:Não feche o canal no receptor de dados ou quando houver vários remetentes.em outras palavrasDevemos permitir apenas que o único remetente de um canal feche este canal.

Uma maneira rude é fechar o canal por meio de recuperação de exceção, mas isso obviamente viola os princípios acima e pode causar corrida de dados. Outra maneira é fechar o canal com sync.Once ou sync.Mutex, o que não é garantido. operações e operações de envio em um canal não criam corridas de dados. Ambos os métodos têm alguns problemas, então não vou apresentá-los em detalhes. Aqui está um método sobre como fechar o canal normalmente.

Cenário 1: M receptores e um remetente

Uma das situações mais fáceis de lidar. Quando o remetente precisar finalizar o envio, basta deixá-lo fechar o canal. Este é o caso dos dois exemplos de programação acima.

Cenário 2: Um receptor e N remetentes

De acordo com os princípios básicos dos canais Go, só podemos fechar o canal no único remetente do canal. Portanto, neste caso, não podemos fechar o canal diretamente em algum lugar.Mas podemos deixar o receptor fechar um canal de sinal adicional para dizer ao remetente para não enviar mais dados.

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

Neste método, adicionamos um canal de sinal adicional stopCh, que o receptor usa para informar ao remetente que não precisa mais receber dados. Além disso, este método não fecha o dataCh. Quando um canal não é mais usado por nenhuma corrotina, ele será gradualmente coletado como lixo, independentemente de ter sido fechado.

A elegância desse método é que, ao fechar um canal, você deixa de usar outro canal, fechando indiretamente o outro canal.

Cenário 3: M receptores e N remetentes

Não podemos fazer com que o receptor ou o remetente fechem o canal usado para transmitir os dados, nem podemos fazer com que um dos múltiplos receptores feche um canal de sinalização adicional. Ambas as práticas violam o princípio do encerramento do canal.

Contudo, podemos apresentarUma função de mediador intermediário e fechamento de canais de sinalização adicionais para notificar todos os receptores e remetentes do fim do trabalho

Exemplo de código:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }