x/sync/semaphoreでgoroutineの数を制御する
golangGoの特長の一つにgoroutineによる並列実行の容易さがあるが、
無尽蔵にgoroutineを生成するとパフォーマンスが悪化してしまうため、
都度go func()
するのではなく、一定数走らせておいたgoroutineに処理を渡すなどして、その数を制御することがある。
Goroutineの数をworkerで抑制する - sambaiz-net
重み付きのセマフォを提供する準標準パッケージgolang.org/x/sync/semaphoreを用いると 容易にこれを行うことができる。
次のようにAcquire(context, weight)
して、使い終わったらRelease(weight)
する。
セマフォのWeightを使い切るとAcquire()
でブロッキングされるので、1ずつAcquire()
していった場合、goroutineの数は最大でもWeightの値で抑えられる。
x/sync/errgroupはGo()
で非同期処理を実行し、エラーが返るか全ての処理が終わるまで Wait()
する。errgroupを用いない場合、最後に最大WeightでAcquire()
することで全ての処理が終わったことを確認する。
package main
import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
func countWithSem(ctx context.Context, maxWorkers int64) {
sem := semaphore.NewWeighted(maxWorkers)
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < 5; i++ {
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("failed to acquire semaphore: %v\n", err)
break
}
func(i int) {
eg.Go(func() error {
defer sem.Release(1)
if i%3 == 0 {
return errors.New("fizz")
}
fmt.Println(i)
return nil
})
}(i)
}
if err := eg.Wait(); err != nil {
fmt.Println(err)
}
}
func main() {
countWithSem(context.TODO(), 1)
}
$ go run main.go
1
failed to acquire semaphore: context canceled
fizz
内部では、Acquire()した際に
重さを持つwaiterがリストの最後尾に追加され、重さが確保されチャネルがcloseされるのをselect
で待ち構えている。
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}