как слушать N каналов? (оператор динамического выбора)

чтобы запустить бесконечный цикл выполнения двух goroutines, я могу использовать следующий код:

после получения msg он запустит новый goroutine и продолжится навсегда.

c1 := make(chan string) c2 := make(chan string) go DoShit(c1, 5) go DoShit(c2, 2) for ; true; { select { case msg1 := <-c1: fmt.Println("received ", msg1) go DoShit(c1, 1) case msg2 := <-c2: fmt.Println("received ", msg2) go DoShit(c2, 9) } } 

Теперь я хотел бы иметь такое же поведение для N goroutines, но как будет выглядеть оператор select в этом случае?

Это бит кода, с которого я начал работать, но я смущен, как закодировать оператор select

 numChans := 2 //I keep the channels in this slice, and want to "loop" over them in the select statemnt var chans = [] chan string{} for i:=0;i<numChans;i++{ tmp := make(chan string); chans = append(chans, tmp); go DoShit(tmp, i + 1) //How shall the select statment be coded for this case? for ; true; { select { case msg1 := <-c1: fmt.Println("received ", msg1) go DoShit(c1, 1) case msg2 := <-c2: fmt.Println("received ", msg2) go DoShit(c2, 9) } } 

Вы можете сделать это, используя функцию Select из пакета отражения :

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select выполняет операцию выбора, описанную в списке случаев. Подобно оператору выбора Go, он блокируется, пока, по крайней мере, один из случаев не может продолжить, делает равномерный псевдослучайный выбор, а затем выполняет этот случай. Он возвращает индекс выбранного случая и, если в этом случае была операция приема, полученное значение и логическое значение указывают, соответствует ли значение отправке по каналу (в отличие от нулевого значения, полученного, поскольку канал закрыт).

Вы передаете массив структур SelectCase которые определяют канал для выбора, направление операции и значение для отправки в случае операции отправки.

Поэтому вы можете сделать что-то вроде этого:

 cases := make([]reflect.SelectCase, len(chans)) for i, ch := range chans { cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} } chosen, value, ok := reflect.Select(cases) # ok will be true if the channel has not been closed. ch := chans[chosen] msg := value.String() 

Вы можете поэкспериментировать с более сложным примером: http://play.golang.org/p/8zwvSk4kjx

Вы можете выполнить это, обернув каждый канал в goroutine, который «пересылает» сообщения на общий «совокупный» канал. Например:

 agg := make(chan string) for _, ch := range chans { go func(c chan string) { for msg := range c { agg <- msg } }(ch) } select { case msg <- agg: fmt.Println("received ", msg) } 

Если вам нужно знать, с какого канала отправлено сообщение, вы можете перенести его в структуру с любой дополнительной информацией, прежде чем перенаправлять ее на общий канал.

В моем (ограниченном) тестировании этот метод значительно отличается от использования пакета отражения:

 $ go test dynamic_select_test.go -test.bench=. ... BenchmarkReflectSelect 1 5265109013 ns/op BenchmarkGoSelect 20 81911344 ns/op ok command-line-arguments 9.463s 

Здесь приведен код бенчмарка

Чтобы расширить некоторые комментарии к предыдущим ответам и дать более четкое сравнение, здесь приведен пример обоих подходов, представленных до сих пор, с учетом того же ввода, fragmentа каналов для чтения и функции для вызова каждого значения, которое также должно знать, какие Измените значение.

Существуют три основных различия между подходами:

  • Сложность. Хотя это может быть частично предпочтительным для читателя, я считаю, что канальный подход более идиоматичен, прямолинейен и читабель.

  • Представление. В моей системе Xeon amd64 каналы goroutines + channels выполняют решение отражения примерно на два порядка (в общем reflection в Go часто медленнее и должно использоваться только тогда, когда это абсолютно необходимо). Конечно, если есть какая-либо значительная задержка либо в функции обработки результатов, либо при записи значений на входные каналы, это различие в производительности может стать незначительным.

  • Семантика блокировки / буферизации. Важность этого зависит от варианта использования. Чаще всего это либо не имеет значения, либо небольшая дополнительная буферизация в решении слияния goroutine может быть полезна для пропускной способности. Однако, если желательно иметь семантику, что только один писатель разблокирован, и это значение полностью обрабатывается до того, как любой другой писатель будет разблокирован, это может быть достигнуто только с помощью решения отражения.

Обратите внимание, что оба подхода могут быть упрощены, если не требуется «идентификатор» отправляющего канала или если исходные каналы никогда не будут закрыты.

Горутинский канал слияния:

 // Process1 calls `fn` for each value received from any of the `chans` // channels. The arguments to `fn` are the index of the channel the // value came from and the string value. Process1 returns once all the // channels are closed. func Process1(chans []<-chan string, fn func(int, string)) { // Setup type item struct { int // index of which channel this came from string // the actual string item } merged := make(chan item) var wg sync.WaitGroup wg.Add(len(chans)) for i, c := range chans { go func(i int, c <-chan string) { // Reads and buffers a single item from `c` before // we even know if we can write to `merged`. // // Go doesn't provide a way to do something like: // merged <- (<-c) // atomically, where we delay the read from `c` // until we can write to `merged`. The read from // `c` will always happen first (blocking as // required) and then we block on `merged` (with // either the above or the below syntax making // no difference). for s := range c { merged <- item{i, s} } // If/when this input channel is closed we just stop // writing to the merged channel and via the WaitGroup // let it be known there is one fewer channel active. wg.Done() }(i, c) } // One extra goroutine to watch for all the merging goroutines to // be finished and then close the merged channel. go func() { wg.Wait() close(merged) }() // "select-like" loop for i := range merged { // Process each value fn(i.int, i.string) } } 

Выбор отражения:

 // Process2 is identical to Process1 except that it uses the reflect // package to select and read from the input channels which guarantees // there is only one value "in-flight" (ie when `fn` is called only // a single send on a single channel will have succeeded, the rest will // be blocked). It is approximately two orders of magnitude slower than // Process1 (which is still insignificant if their is a significant // delay between incoming values or if `fn` runs for a significant // time). func Process2(chans []<-chan string, fn func(int, string)) { // Setup cases := make([]reflect.SelectCase, len(chans)) // `ids` maps the index within cases to the original `chans` index. ids := make([]int, len(chans)) for i, c := range chans { cases[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), } ids[i] = i } // Select loop for len(cases) > 0 { // A difference here from the merging goroutines is // that `v` is the only value "in-flight" that any of // the workers have sent. All other workers are blocked // trying to send the single value they have calculated // where-as the goroutine version reads/buffers a single // extra value from each worker. i, v, ok := reflect.Select(cases) if !ok { // Channel cases[i] has been closed, remove it // from our slice of cases and update our ids // mapping as well. cases = append(cases[:i], cases[i+1:]...) ids = append(ids[:i], ids[i+1:]...) continue } // Process each value fn(ids[i], v.String()) } } 

[Полный код на игровой площадке Go ].

Почему этот подход не будет работать, если кто-то отправляет события?

 func main() { numChans := 2 var chans = []chan string{} for i := 0; i < numChans; i++ { tmp := make(chan string) chans = append(chans, tmp) } for true { for i, c := range chans { select { case x = <-c: fmt.Printf("received %d \n", i) go DoShit(x, i) default: continue } } } } 
  • Преобразование структуры Go в JSON
  • Объявить постоянный массив
  • невозможно загрузить, $ GOPATH не установлен
  • Доступ к структуре на карте (без копирования)
  • Указатели против значений в параметрах и возвращаемых значениях
  • Вызовите функции Go из C
  • В чем смысл интерфейса {}?
  • Как разобрать / дезертизировать динамический JSON в Голанге
  • как устанавливать и получать поля в структурах Golang?
  • Как работает компилятор Go1?
  • Что именно делает runtime.Gosched делать?
  • Давайте будем гением компьютера.