Является ли это идиоматическим пулом рабочих streamов в Go?

Я пытаюсь написать простой рабочий пул с goroutines.

  • Является ли код, который я написал, идиоматическим? Если нет, то что должно измениться?
  • Я хочу иметь возможность установить максимальное число рабочих streamов на 5 и блокировать до тех пор, пока работник не станет доступен, если все 5 заняты. Как я могу продлить это, чтобы иметь всего 5 рабочих? Я создаю статические 5 goroutines и даю каждому work_channel ?

код:

 package main import ( "fmt" "math/rand" "sync" "time" ) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) o <- work + fmt.Sprintf("-%dms", sleepMs) } func main() { var work_channel = make(chan string) var results_channel = make(chan string) // create goroutine per item in work_channel go func() { var c = 0 var wg sync.WaitGroup for work := range work_channel { wg.Add(1) go worker(fmt.Sprintf("%d", c), work, results_channel, &wg) c++ } wg.Wait() fmt.Println("closing results channel") close(results_channel) }() // add work to the work_channel go func() { for c := 'a'; c < 'z'; c++ { work_channel <- fmt.Sprintf("%c", c) } close(work_channel) fmt.Println("sent work to work_channel") }() for x := range results_channel { fmt.Printf("result: %s\n", x) } } 

Ваше решение не является пулом рабочих goroutine в любом смысле: ваш код не ограничивает одновременные goroutines, и он не «повторно использует» goroutines (он всегда начинает новый при получении нового задания).

Модель производителя-потребителя

Как я писал в Bruteforce MD5 Password Crack , вы можете использовать шаблон производителя-потребителя . У вас может быть назначенный производитель goroutine, который будет генерировать задания (вещи, которые нужно делать / вычислять), и отправлять их на канал заданий . У вас может быть фиксированный пул потребительских goroutines (например, 5 из них), который будет проходить по каналу, на котором выполняются задания, и каждый из них будет выполнять / заполнять полученные задания.

Производитель goroutine может просто закрыть канал jobs когда все рабочие места будут созданы и отправлены, что будет правильно сигнализировать потребителям о том, что больше не будет рабочих мест. Конструкция for ... range на канале обрабатывает событие «закрыть» и правильно завершается. Обратите внимание, что все задания, отправленные до закрытия канала, будут доставлены.

Это приведет к чистому дизайну, приведет к фиксированному (но произвольному) числу goroutines, и он всегда будет использовать 100% CPU (если # из goroutines больше, чем # ядра ЦП). Это также имеет то преимущество, что его можно «дросселировать» с правильным выбором пропускной способности канала (буферизованный канал) и количеством потребительских goroutines.

Обратите внимание, что эта модель с назначенным поставщиком goroutine не является обязательной. У вас может быть несколько goroutines для создания заданий, но тогда вы также должны синхронизировать их, чтобы только закрыть канал jobs когда все производители goroutines выполняли задания, – иначе попытка отправить другое задание на канал jobs когда оно уже было закрыто, приводит к временная паника. Обычно производство рабочих мест дешево и может производиться гораздо быстрее, чем они могут быть выполнены, поэтому эта модель для их производства в 1 goroutine, в то время как многие из них потребляют / выполняют их, на практике хороши.

Обработка результатов:

Если задания имеют результаты, вы можете выбрать назначенный канал результата, по которому могут быть доставлены результаты («отправлено назад»), или вы можете решить обработать результаты у потребителя, когда задание будет завершено / закончено. Это последнее может быть реализовано с помощью функции «обратного вызова», которая обрабатывает результаты. Важно то, могут ли результаты обрабатываться независимо или их необходимо объединить (например, каркас сокращения карт) или агрегировать.

Если вы идете с каналом results , вам также нужен goroutine, который получает от него значения, не позволяя потребителям блокироваться (произойдет, если буфер results будет заполнен).

С каналом results

Вместо того, чтобы отправлять простые string значения в качестве заданий и результатов, я бы создал тип оболочки, который может содержать любую дополнительную информацию, и поэтому он намного более гибкий:

 type Job struct { Id int Work string Result string } 

Обратите внимание, что структура Job также обертывает результат, поэтому, когда мы отправляем результат, он также содержит исходный Job как контекст – часто очень полезный . Также обратите внимание, что выгодно просто отправлять указатели ( *Job ) на каналах вместо значений Job поэтому не нужно делать «бесчисленные» копии Job s, а также размер значения структуры Job становится неуместным.

Вот как выглядит этот производитель-потребитель:

Я бы использовал 2 значения sync.WaitGroup , их роль будет следовать:

 var wg, wg2 sync.WaitGroup 

Производитель отвечает за создание заданий, которые должны выполняться:

 func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } 

Когда это будет сделано (больше нет заданий), канал jobs закрыт, что сигнализирует потребителям о том, что больше рабочих мест не поступит.

Обратите внимание, что produce() видит канал jobs только как отправку , потому что это то, что производитель должен делать только с этим: отправлять задания на него (помимо закрытия , но это также разрешено только для канала отправки ). Случайным получением в производителе будет ошибка времени компиляции (обнаруженная в начале, во время компиляции).

Ответственность потребителя заключается в том, чтобы получать рабочие места, пока рабочие места могут быть получены, и выполнять их:

 func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } } 

Обратите внимание, что consume() видит канал jobs только для приема ; потребителю нужно только получать от него. Точно так же канал results отправляется только для потребителя.

Также обратите внимание, что канал results не может быть закрыт здесь, так как есть несколько потребительских goroutines, и только первая попытка закрыть его будет успешной, а последующие будут приводить к панике! канал results может (должен) быть закрыт после завершения всех потребительских горутингов, потому что тогда мы можем быть уверены, что дальнейшие значения (результаты) не будут отправлены по каналу results .

У нас есть результаты, которые необходимо проанализировать:

 func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s\n", job.Result) } } 

Как вы можете видеть, это также получает результаты, пока они могут появиться (пока канал results будет закрыт). Канал results анализатора принимается только .

Обратите внимание на использование типов каналов: всякий раз, когда этого достаточно, используйте только однонаправленный тип канала для обнаружения и предотвращения ошибок раньше, во время компиляции. Используйте только тип двунаправленного канала, если вам нужны оба направления.

И вот как все это склеено:

 func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results } 

Пример вывода:

Вот пример вывода:

Как видите, результаты приходят и анализируются до того, как все задания будут выставлены в очередь:

 worker #4 received: 'e', sleep 81ms worker #0 received: 'a', sleep 887ms worker #1 received: 'b', sleep 847ms worker #2 received: 'c', sleep 59ms worker #3 received: 'd', sleep 81ms worker #2 received: 'f', sleep 318ms result: c-59ms worker #4 received: 'g', sleep 425ms result: e-81ms worker #3 received: 'h', sleep 540ms result: d-81ms worker #2 received: 'i', sleep 456ms result: f-318ms worker #4 received: 'j', sleep 300ms result: g-425ms worker #3 received: 'k', sleep 694ms result: h-540ms worker #4 received: 'l', sleep 511ms result: j-300ms worker #2 received: 'm', sleep 162ms result: i-456ms worker #1 received: 'n', sleep 89ms result: b-847ms worker #0 received: 'o', sleep 728ms result: a-887ms worker #1 received: 'p', sleep 274ms result: n-89ms worker #2 received: 'q', sleep 211ms result: m-162ms worker #2 received: 'r', sleep 445ms result: q-211ms worker #1 received: 's', sleep 237ms result: p-274ms worker #3 received: 't', sleep 106ms result: k-694ms worker #4 received: 'u', sleep 495ms result: l-511ms worker #3 received: 'v', sleep 466ms result: t-106ms worker #1 received: 'w', sleep 528ms result: s-237ms worker #0 received: 'x', sleep 258ms result: o-728ms worker #2 received: 'y', sleep 47ms result: r-445ms worker #2 received: 'z', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms result: z-947ms 

Попробуйте полное приложение на игровой площадке Go .

Без канала results

Код значительно упрощается, если мы не используем канал results но пользователи goroutines обрабатывают результат сразу (напечатайте его в нашем случае). В этом случае нам не нужны 2 значения sync.WaitGroup (второй нужно было только дождаться завершения анализатора).

Без канала results полное решение выглядит так:

 var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs } 

Результат «нравится» с каналом results (но, конечно, порядок выполнения / завершения является случайным).

Попробуйте этот вариант на игровой площадке Go .

Вы можете реализовать счетный семафор для ограничения параллелизма goroutine.

 var tokens = make(chan struct{}, 20) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() tokens <- struct{}{} // acquire a token before performing work sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) <-tokens // release the token o <- work + fmt.Sprintf("-%dms", sleepMs) } 

Это общий дизайн, используемый для ограничения числа рабочих. Вы можете, конечно, изменить местоположение выпуска / получения токенов в соответствии с вашим кодом.

  • .NET Асинхронный stream чтения / записи
  • Неправильная публикация ссылки на объект Java
  • Принуждение нескольких streamов к использованию нескольких процессоров, когда они доступны
  • Инициализация двух streamов одним и тем же экземпляром исполняемого файла
  • Невозможно создать кэшированный пул streamов с ограничением размера?
  • Является ли stl-вектор одновременным чтением streamобезопасным?
  • Как я могу атомизировать приращение переменной в Swift?
  • Как дождаться завершения всех streamов, используя ExecutorService?
  • Является x86 CMPXCHG атомом?
  • В чем разница между ConcurrentHashMap и Collections.synchronizedMap (Карта)?
  • Нужно ли защищать доступ для чтения к контейнеру STL в многопоточной среде?
  • Давайте будем гением компьютера.