моя контактная информация
Почтамезофия@protonmail.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Сопрограмма — это легкий поток пользовательского уровня, который имеет независимое пространство стека и разделяет пространство кучи программы.
Это микропоток, реализованный с помощью алгоритмов на основе однопоточного программирования. По сравнению с многопоточным программированием он имеет следующие преимущества:
Канал — это структура данных, используемая для связи между сопрограммами. Подобно очереди, один конец — отправитель, а другой — получатель. Использование каналов может обеспечить синхронизацию и порядок данных.
Каналы делятся на буферизованные и небуферизованные каналы, которые объявляются следующим образом:
intChan := make(chan int,<缓冲容量>)
intChan := make(chan int)
Разница между буферизованными и небуферизованными каналами:
Что необходимо отметить при реализации небуферизованных каналов, так это то, что на обоих концах канала должны быть отправитель и получатель, иначе произойдет взаимоблокировка.
Значение вопроса: используйте канал сопрограммы для поочередной печати цифр 1-10 и букв AJ.
Код:
- package main
-
- import (
- "fmt"
- "sync"
- )
-
- /*
- 无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
- 解决办法:
- 避免死锁的发生:
- 当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
- */
-
- func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
- defer wg.Done()
-
- for i := 1; i <= 10; i++ {
- <-alpCh // 等待字母goroutine发信号
- fmt.Print(i, " ")
- //避免死锁发生
- if i < 10 {
- numCh <- struct{}{} // 发信号给字母goroutine
- }
- if i == 10 {
- close(numCh)
- }
- }
-
- }
-
- func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
- defer wg.Done()
-
- for i := 'A'; i <= 'J'; i++ {
- <-numCh // 等待数字goroutine发信号
- fmt.Printf("%c", i)
- alpCh <- struct{}{} // 发信号给数字goroutine
- }
- close(alpCh)
- }
-
- func main() {
- numCh := make(chan struct{}) // 用于数字goroutine的信号通道
- alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
- var wg sync.WaitGroup
-
- wg.Add(2)
-
- go printAlp(&wg, numCh, alpCh)
- go printNum(&wg, numCh, alpCh)
-
- // 启动时先给数字goroutine发送一个信号
- numCh <- struct{}{}
-
- wg.Wait()
-
- }
Анализ темы:
Вопрос требует, чтобы мы поочередно выводили буквы и цифры, поэтому нам необходимо обеспечить строгий порядок двух сопрограмм, который соответствует сценарию применения небуферизованных каналов. Настройте два канала для хранения цифр и букв соответственно. Две сопрограммы, которые печатают цифры и буквы, служат отправителем и получателем двух каналов соответственно. Напечатайте один раз в цикле и один раз отправьте сигнал, чтобы напомнить другой сопрограмме о необходимости печати.
Следует отметить, что при печати последнего символа «10» сопрограмма печати букв завершилась, а у канала numCh нет приемника. В это время условия реализации небуферизованного канала уже не выполняются — они должны быть. отправитель и получатель. В противном случае повторная отправка сигнала приведет к взаимоблокировке блокировки. Таким образом, нет необходимости отправлять сигнал еще раз в 10-й раз.
Название: Разработка планировщика задач, который использует модель многосопрограммного + канального программирования для реализации бизнес-сценариев для одновременной обработки нескольких задач, при этом порядок планирования должен соответствовать порядку добавления задач.
Код:
- type scheduler struct {
- taskChan chan func()
- wt sync.WaitGroup
- }
-
- func (td *scheduler) AddTask(task func()) {
- td.taskChan <- task
- }
-
- func (td *scheduler) Executer() {
- defer td.wt.Done()
- for {
- task, ok := <-td.taskChan
- task()
- if ok && len(td.taskChan) == 0 {
- break
- }
- }
- }
-
- func (td *scheduler) Start() {
- td.wt.Add(4)
- //假设四个消费者
- for i := 0; i < 4; i++ {
- go td.Executer()
- }
-
- td.wt.Wait()
- }
-
- func main() {
- sd := scheduler{
- taskChan: make(chan func(), 5),
- }
-
- go func() {
- sd.AddTask(func() {
- fmt.Println("任务1")
- })
- sd.AddTask(func() {
- fmt.Println("任务2")
- })
- sd.AddTask(func() {
- fmt.Println("任务3")
- })
- sd.AddTask(func() {
- fmt.Println("任务4")
- })
- sd.AddTask(func() {
- fmt.Println("任务5")
- })
- sd.AddTask(func() {
- fmt.Println("任务6")
- })
- close(sd.taskChan)
- }()
-
- sd.Start()
-
- }
анализ проблемы:
Поскольку добавленные задачи являются многозадачными, их несколько, и для выполнения этих задач требуется асинхронная обработка. Соответствие буферизованным каналам требует повышения пропускной способности и асинхронной обработки.
Затем нам нужно поместить задачу в канал, и несколько получателей смогут по порядку брать задачи из канала и выполнять их.
Следует отметить, что если количество добавленных задач превышает буфер канала, это приведет к блокировке при добавлении задач. Чтобы не влиять на нормальный запуск потребителя, ему нужно открыть отдельную сопрограмму для добавления задач.
Таким образом, когда потребитель потребляет, производитель блокировки будет пробужден для продолжения добавления задач.
После изучения модели программирования сопрограмма + канал, помимо того, что было только что упомянуто в заголовке, нам следует также обратить внимание на следующие вопросы:
Прежде всего, самый основной принцип закрытия каналов – не закрывать каналы, которые были закрыты. Во-вторых, есть еще один принцип использования каналов Go:Не закрывайте канал на приемнике данных или при наличии нескольких отправителей.другими словами,Нам следует позволить единственному отправителю канала закрыть этот канал.
Грубый способ — закрыть канал через восстановление исключений, но это явно нарушает вышеуказанные принципы и может вызвать гонку данных; другой способ — закрыть канал с помощью sync.Once или sync.Mutex, что не гарантирует, что произойдет одновременное закрытие. операции и операции отправки на канале не создают гонок за данными. Оба метода имеют определенные проблемы, поэтому я не буду описывать их подробно. Вот способ корректного закрытия канала.
Одна из самых простых ситуаций. Когда отправителю необходимо завершить отправку, просто позвольте ему закрыть канал. Именно так обстоит дело в двух приведенных выше примерах программирования.
Согласно основным принципам каналов Go, мы можем закрыть канал только у единственного отправителя канала. Значит, в данном случае мы не можем напрямую где-то закрыть канал.Но мы можем позволить получателю закрыть дополнительный канал сигнала, чтобы сообщить отправителю больше не отправлять данные.。
- package main
-
- import (
- "log"
- "sync"
- )
-
- func main() {
-
- cosnt N := 5
- cosnt Max := 60000
- count := 0
-
- dataCh := make(chan int)
- stopCh := make(chan bool)
-
- var wt sync.WaitGroup
- wt.Add(1)
-
- //发送者
- for i := 0; i < N; i++ {
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- count += 1
- dataCh <- count
- }
- }
- }()
- }
-
- //接收者
- go func() {
- defer wt.Done()
- for value := range dataCh {
- if value == Max {
- // 此唯一的接收者同时也是stopCh通道的
- // 唯一发送者。尽管它不能安全地关闭dataCh数
- // 据通道,但它可以安全地关闭stopCh通道。
- close(stopCh)
- return
- }
- log.Println(value)
- }
- }()
-
- wt.Wait()
- }
В этом методе мы добавляем дополнительный канал сигнала stopCh, который получатель использует, чтобы сообщить отправителю, что ему больше не нужно получать данные. Более того, этот метод не закрывает dataCh. Когда канал больше не используется какой-либо сопрограммой, он постепенно будет очищен от мусора, независимо от того, был ли он закрыт.
Элегантность этого метода в том, что закрывая один канал, вы перестаете использовать другой канал, тем самым косвенно закрывая другой канал.
Мы не можем заставить получателя или отправителя закрыть канал, используемый для передачи данных, а также мы не можем заставить один из нескольких получателей закрыть дополнительный сигнальный канал. Обе эти практики нарушают принцип закрытия канала.
Однако мы можем представитьРоль промежуточного посредника и закрытие им дополнительных сигнальных каналов для оповещения всех получателей и отправителей об окончании работы.。
Пример кода:
- package main
-
- import (
- "log"
- "math/rand"
- "strconv"
- "sync"
- )
-
- func main() {
-
- const Max = 100000
- const NumReceivers = 10
- const NumSenders = 1000
-
- var wt sync.WaitGroup
- wt.Add(NumReceivers)
-
- dataCh := make(chan int)
- stopCh := make(chan struct{})
- // stopCh是一个额外的信号通道。它的发送
- // 者为中间调解者。它的接收者为dataCh
- // 数据通道的所有的发送者和接收者。
- toStop := make(chan string, 1)
- // toStop是一个用来通知中间调解者让其
- // 关闭信号通道stopCh的第二个信号通道。
- // 此第二个信号通道的发送者为dataCh数据
- // 通道的所有的发送者和接收者,它的接收者
- // 为中间调解者。它必须为一个缓冲通道。
-
- var stoppedBy string
-
- // 中间调解者
- go func() {
- stoppedBy = <-toStop
- close(stopCh)
- }()
-
- // 发送者
- for i := 0; i < NumSenders; i++ {
- go func(id string) {
- for {
- value := rand.Intn(Max)
- if value == 0 {
- // 为了防止阻塞,这里使用了一个尝试
- // 发送操作来向中间调解者发送信号。
- select {
- case toStop <- "发送者#" + id:
- default:
- }
- return
- }
-
- select {
- case <-stopCh:
- return
- case dataCh <- value:
- }
- }
- }(strconv.Itoa(i))
- }
-
- // 接收者
- for i := 0; i < NumReceivers; i++ {
- go func(id string) {
- defer wt.Done()
-
- for {
- select {
- case <-stopCh:
- return
- case value := <-dataCh:
- if value == Max {
- // 为了防止阻塞,这里使用了一个尝试
- // 发送操作来向中间调解者发送信号。
- select {
- case toStop <- "接收者:" + id:
- default:
- }
- return
- }
-
- log.Println(value)
- }
- }
- }(strconv.Itoa(i))
- }
-
- wt.Wait()
- log.Println("被" + stoppedBy + "终止了")
-
- }