x/sync/semaphoreでgoroutineの数を制御する

golang

Goの特長の一つにgoroutineによる並列実行の容易さがあるが、 無尽蔵にgoroutineを生成するとパフォーマンスが悪化してしまうため、 都度go func()するのではなく、一定数走らせておいたgoroutineに処理を渡すなどして、その数を制御することがある。

Goroutineの数をworkerで抑制する - sambaiz-net

重み付きのセマフォを提供する準標準パッケージgolang.org/x/sync/semaphoreを用いると 容易にこれを行うことができる。

次のようにAcquire(context, weight)して、使い終わったらRelease(weight)する。 セマフォのWeightを使い切るとAcquire()でブロッキングされるので、1ずつAcquire()していった場合、goroutineの数は最大でもWeightの値で抑えられる。 x/sync/errgroupGo()で非同期処理を実行し、エラーが返るか全ての処理が終わるまで Wait() する。errgroupを用いない場合、最後に最大WeightでAcquire()することで全ての処理が終わったことを確認する。

package main
import (
	"context"
    "errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"golang.org/x/sync/semaphore"
)

func countWithSem(ctx context.Context, maxWorkers int64) {
	sem := semaphore.NewWeighted(maxWorkers)
	eg, ctx := errgroup.WithContext(ctx)
	for i := 0; i < 5; i++ {
		if err := sem.Acquire(ctx, 1); err != nil {
			fmt.Printf("failed to acquire semaphore: %v\n", err)
			break
		}
		func(i int) {
			eg.Go(func() error {
				defer sem.Release(1)
				if i%3 == 0 {
					return errors.New("fizz")
				}
				fmt.Println(i)
				return nil
			})
		}(i)
	}
	if err := eg.Wait(); err != nil {
		fmt.Println(err)
	}
}

func main() {
    countWithSem(context.TODO(), 1)
}
$ go run main.go
1
failed to acquire semaphore: context canceled
fizz

内部では、Acquire()した際に 重さを持つwaiterがリストの最後尾に追加され、重さが確保されチャネルがcloseされるのをselectで待ち構えている。

type waiter struct {
	n     int64
	ready chan<- struct{} // Closed when semaphore acquired.
}