Berbagi teknologi

Masalah aplikasi coroutine dan saluran Go yang komprehensif

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Pahami secara singkat apa itu coroutine dan saluran

Apa itu coroutine

Coroutine adalah thread ringan tingkat pengguna yang memiliki ruang tumpukan independen dan berbagi ruang tumpukan program.

Ini adalah mikro-thread yang diimplementasikan melalui algoritma berdasarkan single thread, dibandingkan dengan pemrograman multi-thread, ia memiliki keuntungan sebagai berikut:

  • Peralihan konteks coroutine ditentukan oleh pengguna, tanpa memerlukan peralihan konteks kernel sistem, sehingga mengurangi overhead.
  • Secara default, coroutine dilindungi sepenuhnya untuk mencegah interupsi.Tidak diperlukan kunci operasi atom
  • Satu thread juga dapat mencapai konkurensi tinggi, dan bahkan CPU inti tunggal dapat mendukung puluhan ribu coroutine.

apa itu saluran

Saluran adalah struktur data yang digunakan untuk komunikasi antar coroutine. Mirip dengan antrian, salah satu ujungnya adalah pengirim dan ujung lainnya adalah penerima. Menggunakan saluran dapat memastikan sinkronisasi dan urutan data.

Saluran dibagi menjadi saluran buffered dan saluran unbuffered, yang dinyatakan sebagai berikut:

  • Ada saluran penyangga
intChan := make(chan int,<缓冲容量>)
  • saluran tanpa buffer
intChan := make(chan int)

Perbedaan antara saluran yang di-buffer dan saluran yang tidak di-buffer:

  • Pemblokiran: Pengirim saluran yang tidak di-buffer akan memblokir hingga data diterima; pengirim saluran yang di-buffer akan memblokir hingga buffer penuh, dan penerima akan memblokir hingga buffer tidak kosong.
  • Sinkronisasi dan urutan data: Saluran tanpa buffer menjamin sinkronisasi dan urutan data; pipa dengan buffer tidak menjamin sinkronisasi dan urutan data.
  • Skenario aplikasi: Saluran tanpa buffer memerlukan sinkronisasi dan urutan yang ketat; saluran dengan buffer dapat berkomunikasi secara asinkron dan meningkatkan throughput.

Yang perlu diperhatikan dalam penerapan saluran unbuffered adalah harus ada pengirim dan penerima di kedua ujung saluran, jika tidak maka akan terjadi kebuntuan.

2. Kasus pemrograman bersamaan saluran coroutine

(1) Cetak huruf dan angka secara bergantian

Arti pertanyaan: Gunakan saluran coroutine untuk mencetak angka 1-10 dan huruf AJ secara bergantian.

Kode:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. /*
  7. 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
  8. 解决办法:
  9. 避免死锁的发生:
  10. 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
  11. */
  12. func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  13. defer wg.Done()
  14. for i := 1; i <= 10; i++ {
  15. <-alpCh // 等待字母goroutine发信号
  16. fmt.Print(i, " ")
  17. //避免死锁发生
  18. if i < 10 {
  19. numCh <- struct{}{} // 发信号给字母goroutine
  20. }
  21. if i == 10 {
  22. close(numCh)
  23. }
  24. }
  25. }
  26. func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
  27. defer wg.Done()
  28. for i := 'A'; i <= 'J'; i++ {
  29. <-numCh // 等待数字goroutine发信号
  30. fmt.Printf("%c", i)
  31. alpCh <- struct{}{} // 发信号给数字goroutine
  32. }
  33. close(alpCh)
  34. }
  35. func main() {
  36. numCh := make(chan struct{}) // 用于数字goroutine的信号通道
  37. alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
  38. var wg sync.WaitGroup
  39. wg.Add(2)
  40. go printAlp(&wg, numCh, alpCh)
  41. go printNum(&wg, numCh, alpCh)
  42. // 启动时先给数字goroutine发送一个信号
  43. numCh <- struct{}{}
  44. wg.Wait()
  45. }

Analisis topik:

Pertanyaannya mengharuskan kita mencetak huruf dan angka secara bergantian, jadi kita perlu memastikan urutan kedua coroutine yang ketat, yang konsisten dengan skenario penerapan saluran tanpa buffer. Siapkan dua saluran untuk menyimpan angka dan huruf masing-masing. Dua coroutine yang mencetak angka dan huruf masing-masing berfungsi sebagai pengirim dan penerima kedua saluran. Cetak sekali dalam satu putaran dan kirim sinyal sekali untuk mengingatkan coroutine lain untuk mencetak.

Perlu dicatat bahwa ketika karakter terakhir '10' dicetak, coroutine untuk mencetak huruf telah berakhir, dan saluran numCh tidak memiliki penerima. Saat ini, kondisi implementasi saluran tanpa buffer tidak lagi terpenuhi - harus ada pengirim dan penerima. Jika tidak, pengiriman sinyal lagi akan menyebabkan kebuntuan pemblokiran. Jadi tidak perlu mengirimkan sinyal lagi untuk yang ke 10 kalinya.

(2) Rancang penjadwal tugas

Judul: Merancang penjadwal tugas yang menggunakan model pemrograman multi-coroutine + saluran untuk mengimplementasikan skenario bisnis pemrosesan multi-tugas secara bersamaan, dan memerlukan urutan penjadwalan sesuai urutan penambahan tugas.

Kode:

  1. type scheduler struct {
  2. taskChan chan func()
  3. wt sync.WaitGroup
  4. }
  5. func (td *scheduler) AddTask(task func()) {
  6. td.taskChan <- task
  7. }
  8. func (td *scheduler) Executer() {
  9. defer td.wt.Done()
  10. for {
  11. task, ok := <-td.taskChan
  12. task()
  13. if ok && len(td.taskChan) == 0 {
  14. break
  15. }
  16. }
  17. }
  18. func (td *scheduler) Start() {
  19. td.wt.Add(4)
  20. //假设四个消费者
  21. for i := 0; i < 4; i++ {
  22. go td.Executer()
  23. }
  24. td.wt.Wait()
  25. }
  26. func main() {
  27. sd := scheduler{
  28. taskChan: make(chan func(), 5),
  29. }
  30. go func() {
  31. sd.AddTask(func() {
  32. fmt.Println("任务1")
  33. })
  34. sd.AddTask(func() {
  35. fmt.Println("任务2")
  36. })
  37. sd.AddTask(func() {
  38. fmt.Println("任务3")
  39. })
  40. sd.AddTask(func() {
  41. fmt.Println("任务4")
  42. })
  43. sd.AddTask(func() {
  44. fmt.Println("任务5")
  45. })
  46. sd.AddTask(func() {
  47. fmt.Println("任务6")
  48. })
  49. close(sd.taskChan)
  50. }()
  51. sd.Start()
  52. }

analisa masalah:

Karena tugas yang ditambahkan bersifat multitugas, maka ada lebih dari satu tugas, dan pemrosesan asinkron diperlukan untuk menjalankan tugas ini. Mematuhi saluran buffered memerlukan peningkatan throughput dan pemrosesan asinkron.

Kemudian, kita perlu memasukkan tugas ke dalam saluran, dan beberapa penerima dapat mengambil tugas dari saluran tersebut secara berurutan dan menjalankannya.

Masalah yang perlu diperhatikan adalah jika jumlah tugas yang ditambahkan lebih besar dari buffer saluran maka akan menyebabkan pemblokiran saat penambahan tugas. Agar tidak mempengaruhi startup normal konsumen, ia perlu membuka coroutine terpisah untuk menambahkan tugas.

Dengan cara ini, ketika konsumen melakukan konsumsi, produsen yang menghalangi akan dibangunkan untuk terus menambah tugas.

3. Ringkasan

Setelah mempelajari model pemrograman coroutine + channel, selain yang baru saja disebutkan pada judul, kita juga harus memperhatikan hal-hal berikut:

1. Mengapa saluran harus ditutup setelah habis?

  • Untuk menghindari kebuntuan. Menutup saluran juga memberi tahu penerima bahwa tidak ada lagi data yang dikirim dari pengirim dan tidak perlu terus menunggu data. Setelah menerima informasi penutupan saluran, penerima berhenti menerima data; jika saluran tidak ditutup, penerima akan tetap diblokir dan ada risiko kebuntuan.
  • Lepaskan sumber daya dan hindari kebocoran sumber daya. Setelah menutup saluran, sistem akan melepaskan sumber daya yang sesuai. Menutup saluran tepat waktu dapat menghindari pemborosan dan kebocoran sumber daya.

2. Bagaimana cara menutup saluran dengan baik?

Pertama-tama, prinsip paling dasar dalam menutup saluran adalah jangan menutup saluran yang sudah ditutup. Kedua, ada prinsip lain dalam menggunakan saluran Go:Jangan menutup saluran pada penerima data atau ketika ada banyak pengirim.dengan kata lainKita hanya boleh membiarkan satu-satunya pengirim saluran menutup saluran ini.

Cara yang kasar adalah dengan menutup saluran melalui pemulihan pengecualian, tetapi ini jelas melanggar prinsip di atas dan dapat menyebabkan persaingan data. Cara lain adalah menutup saluran dengan sync.Once atau sync.Mutex, yang tidak dijamin akan terjadi secara bersamaan operasi dan operasi pengiriman pada saluran tidak menyebabkan perlombaan data. Kedua metode tersebut memiliki masalah tertentu, jadi saya tidak akan memperkenalkannya secara detail. Berikut adalah metode cara menutup saluran dengan baik.

Skenario 1: M penerima dan satu pengirim

Salah satu situasi termudah untuk dihadapi. Ketika pengirim harus menyelesaikan pengiriman, biarkan saja ia menutup salurannya. Hal ini terjadi pada dua contoh pemrograman di atas.

Skenario 2: Satu penerima dan N pengirim

Menurut prinsip dasar saluran Go, kita hanya dapat menutup saluran pada satu-satunya pengirim saluran tersebut. Jadi, dalam hal ini, kita tidak bisa langsung menutup saluran tersebut di suatu tempat.Namun kita dapat membiarkan penerima menutup saluran sinyal tambahan untuk memberitahu pengirim agar tidak mengirim data lagi.

  1. package main
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. func main() {
  7. cosnt N := 5
  8. cosnt Max := 60000
  9. count := 0
  10. dataCh := make(chan int)
  11. stopCh := make(chan bool)
  12. var wt sync.WaitGroup
  13. wt.Add(1)
  14. //发送者
  15. for i := 0; i < N; i++ {
  16. go func() {
  17. for {
  18. select {
  19. case <-stopCh:
  20. return
  21. default:
  22. count += 1
  23. dataCh <- count
  24. }
  25. }
  26. }()
  27. }
  28. //接收者
  29. go func() {
  30. defer wt.Done()
  31. for value := range dataCh {
  32. if value == Max {
  33. // 此唯一的接收者同时也是stopCh通道的
  34. // 唯一发送者。尽管它不能安全地关闭dataCh数
  35. // 据通道,但它可以安全地关闭stopCh通道。
  36. close(stopCh)
  37. return
  38. }
  39. log.Println(value)
  40. }
  41. }()
  42. wt.Wait()
  43. }

Dalam metode ini, kami menambahkan saluran sinyal tambahan stopCh, yang digunakan penerima untuk memberi tahu pengirim bahwa ia tidak perlu lagi menerima data. Selain itu, metode ini tidak menutup dataCh. Ketika saluran tidak lagi digunakan oleh coroutine apa pun, saluran tersebut akan dikumpulkan secara bertahap, terlepas dari apakah saluran tersebut telah ditutup.

Keanggunan metode ini adalah dengan menutup satu saluran, Anda berhenti menggunakan saluran lain, sehingga secara tidak langsung menutup saluran lainnya.

Skenario 3: M penerima dan N pengirim

Kita tidak bisa meminta penerima atau pengirim menutup saluran yang digunakan untuk mengirimkan data, kita juga tidak bisa meminta salah satu dari beberapa penerima menutup saluran sinyal tambahan. Kedua praktik ini melanggar prinsip penutupan saluran.

Namun, kami bisa memperkenalkannyaPeran mediator perantara dan menutup saluran sinyal tambahan untuk memberi tahu semua penerima dan pengirim tentang akhir pekerjaan

Contoh kode:

  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "strconv"
  6. "sync"
  7. )
  8. func main() {
  9. const Max = 100000
  10. const NumReceivers = 10
  11. const NumSenders = 1000
  12. var wt sync.WaitGroup
  13. wt.Add(NumReceivers)
  14. dataCh := make(chan int)
  15. stopCh := make(chan struct{})
  16. // stopCh是一个额外的信号通道。它的发送
  17. // 者为中间调解者。它的接收者为dataCh
  18. // 数据通道的所有的发送者和接收者。
  19. toStop := make(chan string, 1)
  20. // toStop是一个用来通知中间调解者让其
  21. // 关闭信号通道stopCh的第二个信号通道。
  22. // 此第二个信号通道的发送者为dataCh数据
  23. // 通道的所有的发送者和接收者,它的接收者
  24. // 为中间调解者。它必须为一个缓冲通道。
  25. var stoppedBy string
  26. // 中间调解者
  27. go func() {
  28. stoppedBy = <-toStop
  29. close(stopCh)
  30. }()
  31. // 发送者
  32. for i := 0; i < NumSenders; i++ {
  33. go func(id string) {
  34. for {
  35. value := rand.Intn(Max)
  36. if value == 0 {
  37. // 为了防止阻塞,这里使用了一个尝试
  38. // 发送操作来向中间调解者发送信号。
  39. select {
  40. case toStop <- "发送者#" + id:
  41. default:
  42. }
  43. return
  44. }
  45. select {
  46. case <-stopCh:
  47. return
  48. case dataCh <- value:
  49. }
  50. }
  51. }(strconv.Itoa(i))
  52. }
  53. // 接收者
  54. for i := 0; i < NumReceivers; i++ {
  55. go func(id string) {
  56. defer wt.Done()
  57. for {
  58. select {
  59. case <-stopCh:
  60. return
  61. case value := <-dataCh:
  62. if value == Max {
  63. // 为了防止阻塞,这里使用了一个尝试
  64. // 发送操作来向中间调解者发送信号。
  65. select {
  66. case toStop <- "接收者:" + id:
  67. default:
  68. }
  69. return
  70. }
  71. log.Println(value)
  72. }
  73. }
  74. }(strconv.Itoa(i))
  75. }
  76. wt.Wait()
  77. log.Println("被" + stoppedBy + "终止了")
  78. }