Technology Sharing

Comprehensive application issues of Go coroutines and channels

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Briefly understand what coroutines and channels are

What is a coroutine?

A coroutine is a user-level lightweight thread that has its own stack space and shares the program's heap space.

It is a microthread implemented by an algorithm based on a single thread. Compared with multi-threaded programming, it has the following advantages:

  • The context switching of the coroutine is determined by the user, and there is no need for the context switching of the system kernel, which reduces the overhead
  • By default, coroutines are fully protected to prevent interruption. No atomic locks are required.
  • A single thread can also achieve high concurrency, and even a single-core CPU can support tens of thousands of coroutines.

What is a channel

A channel is a data structure used for communication between coroutines. It is similar to a queue, with one end being the sender and the other end being the receiver. Using channels can ensure the synchronization and order of data.

Channels are divided into buffered channels and unbuffered channels, which are declared as follows:

  • Buffered channel
intChan := make(chan int,<缓冲容量>)
  • Unbuffered Channel
intChan := make(chan int)

The difference between buffered channels and unbuffered channels:

  • Blocking: The sender of an unbuffered channel blocks until data is received; the sender of a buffered channel blocks until the buffer is full, and the receiver blocks until the buffer is not empty.
  • Data synchronization and order: Unbuffered channels guarantee data synchronization and order; buffered channels do not guarantee data synchronization and order.
  • Application scenarios: Unbuffered channels require strict synchronization and sequentiality; buffered channels allow asynchronous communication and improve throughput.

It should be noted that in the implementation of unbuffered channels, there must be a sender and a receiver at both ends of the channel, otherwise a deadlock will occur.

2. Coroutine-channel concurrent programming case

(1) Print letters and numbers alternately

Topic: Use coroutine-channel to print numbers 1-10 and letters AJ alternately.

Code:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

Topic Analysis:

The question requires us to print letters and numbers alternately, which requires us to ensure the strict order of the two coroutines, in line with the application scenario of unbuffered channels. Set up two channels to store numbers and letters respectively. The two coroutines that print numbers and letters serve as senders and receivers of the two channels respectively. Print once in a loop and send a signal to remind the other coroutine to print.

It should be noted that when the last character '10' is printed, the coroutine for printing letters has ended, and the numCh channel has no receiver. At this time, it no longer meets the implementation conditions of the unbuffered channel - there must be a sender and a receiver. Sending a signal again will cause blocking deadlock. Therefore, there is no need to send a signal for the 10th time.

(2) Design a task scheduler

Title: Design a task scheduler that uses the multi-coroutine + channel programming model to implement business scenarios that process multiple tasks concurrently, and requires that the scheduling order be based on the order in which the tasks are added.

Code:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

problem analysis:

Since the added tasks are multiple tasks, more than one, and asynchronous processing is required to execute these tasks, the buffered channel needs to improve throughput and asynchronous processing.

Then, we need to put the tasks into the channel, and multiple receivers can take the tasks from the channel in order and execute them.

It should be noted that if the number of tasks added is greater than the channel buffer, it will be blocked when adding tasks. In order not to affect the normal startup of the consumer, it is necessary to open a separate coroutine to add tasks.

In this way, when the consumer consumes, the blocked producer will be awakened to continue adding tasks.

3. Summary

After learning the coroutine + channel programming model, in addition to what was just mentioned in the title, we should also pay attention to the following issues:

1. Why should the channel be closed after use? What are the risks of not closing it?

  • To avoid deadlock, closing the channel also tells the receiver that there is no data to be sent from the sender and it does not need to wait for data any more. After receiving the information that the channel is closed, the receiver stops receiving data. If the channel is not closed, the receiver will be blocked all the time, which may cause deadlock.
  • Release resources and avoid resource leakage. After closing the channel, the system will release the corresponding resources. Closing the channel in time can avoid resource waste and leakage.

2. How to close the channel gracefully?

First of all, the most basic principle of closing a channel is not to close a channel that has already been closed. Secondly, there is another principle for using Go channels:Do not close the channel on the receiver side or when there are multiple senders.in other wordsWe should only allow the sole sender on a channel to close the channel.

A crude way is to close the channel by exception recovery, but it obviously violates the above principles and may cause data competition; another way is to close the channel by sync.Once or sync.Mutex, but it does not guarantee that concurrent closing operations and sending operations on a channel will not cause data competition. Both methods have certain problems, so I will not introduce them in detail. The following is a method of how to close the channel gracefully.

Case 1: M receivers and one sender

This is the easiest case to handle. When the sender needs to finish sending, just let it close the channel. The two programming examples above are such cases.

Scenario 2: One Receiver and N Senders

According to the basic principle of Go channel, we can only close the channel when it is the only sender. So, in this case, we can't close the channel directly somewhere.But we can let the receiver close an additional signal channel to tell the sender not to send any more data

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

In this method, we add an additional signal channel stopCh, which the receiver uses to tell the sender that it does not need to receive any more data. In addition, this method does not close dataCh. When a channel is no longer used by any coroutine, it will gradually be garbage collected, regardless of whether it has been closed.

The elegance of this approach lies in the fact that closing one channel stops the use of the other channel, thereby indirectly closing the other channel.

Scenario 3: M receivers and N senders

We cannot allow any of the receiver and sender to close the channel used to transmit data, nor can we allow one of multiple receivers to close an additional signal channel. Both practices violate the channel closing principle.

However, we can introduceAn intermediate mediator role and closes additional signal channels to notify all receivers and senders that the work is finished

Code example:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }