Goroutineの数をworkerで抑制する

golang

Goのnet/httpとKeep-Alive - sambaiz.netでやったように、 あるエンドポイントに連続してGoroutineでリクエストを投げると、リクエスト数を増やしたときにタイムアウトが頻発するようになった。

まず、2000リクエストを投げてみた結果。

[RESULT] request: 2000, ok: 2000, ng: 0, time(ms) 138

一応全部捌けてはいるが、おおよそ同時にリクエストを送っているのにタイムアウト(100ms)時間を超えてしまっている。これをさらに3000に増やしてみる。

[RESULT] request: 3000, ok: 13, ng: 2987, time(ms) 372

ほぼ全滅してしまった…。タイムアウト時間を大きく過ぎておりGoroutineの処理に遅延が発生しているようだ。 そこで、都度Goroutineを生成してリクエストを投げるのではなく、 一定数のWorkerに処理させることで、同時に作られるGoroutineの数を抑制する。

type Req struct {
	Okch chan int
	Ngch chan int
}

func startWorker(ctx context.Context, num int) (requestch chan *Req) {

	requestch = make(chan *Req)

	for i := 0; i < num; i++ {
		go func() {
			for {
				select {
				case req := <-requestch:
					request(req.Okch, req.Ngch)
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	return
}

func main(){
    ...
    ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	requestch := startWorker(ctx, 1000)

    requestch <- req
    ...
}

結果、すべてのリクエストをタイムアウトせずに送れるようになった。

[RESULT] request: 3000, ok: 3000, ng: 0, time(ms) 157
[RESULT] request: 5000, ok: 5000, ng: 0, time(ms) 239

上のコードではWorkerを作るにあたって単純にWorkerの数分goroutineを生成して共通のチャネルに入ってきたものを読んで処理させているが、 以下の記事のようにDispatcherを用意してWorkerPool(chan chan Job)からWorkerのjobChannel(chan Job)を取り出して送る方法も紹介されていたので これとも比較してみた。今回は入力するチャネルだけ分けて終了方法はStartで渡したcontextに一任しているので上の方法とさほど変わらず、むしろ冗長に見えるが、 実際はWorkerそれぞれがquitするチャネルなどを持っていて、独立して終了させることができるため、Workerの数を動的にコントロールしやすいのが特長だと考えている。

Handling 1 Million Requests per Minute with Go · marcio.io

type Dispatcher struct{
	Requestch chan *Req
	workerPool chan chan *Req
	workerNum int
}

func NewDispatcher(workerNum int) *Dispatcher{
	return &Dispatcher{
		Requestch: make(chan *Req),
		workerPool: make(chan chan *Req, workerNum),
		workerNum: workerNum,
	}
}


func (d *Dispatcher) Start(ctx context.Context) error{
	poolLength := len(d.workerPool)
	if poolLength != 0{
		return errors.New("already started")
	}
	for i := 0; i < d.workerNum; i++{
		startWorker(ctx, 1, d.workerPool)
	}

	go d.dispatch(ctx)

	return nil
}

func (d *Dispatcher) dispatch(ctx context.Context){
	for{
		select{
		case req := <- d.Requestch:
            // workerPoolからchanを取り出しreqを入れる
			worker := <- d.workerPool
			worker <- req
		case <-ctx.Done():
			return 
		}
	}
}

func startWorker(ctx context.Context, num int, workerPool chan chan *Req) {

	requestch := make(chan *Req)

	for i := 0; i < num; i++ {
		go func() {
			for {
                // workerPoolにchanを入れる(終わったらまだ戻る)
				workerPool <- requestch
				select {
				case req := <-requestch:
					request(req.Okch, req.Ngch)
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	return
}

func main(){
    ...
    dispatcher := NewDispatcher(1000)
	if err := dispatcher.Start(ctx); err != nil{
		panic(err)
	}

    dispatcher.requestch <- req
}

ほとんど変わらず。

[RESULT] request: 3000, ok: 3000, ng: 0, time(ms) 169
[RESULT] request: 5000, ok: 5000, ng: 0, time(ms) 246

x/sync/semaphoreでgoroutineの数を制御する - sambaiz-net