Goroutineの数をworkerで抑制する
golangGoのnet/httpとKeep-Alive - sambaiz.netでやったように、 あるエンドポイントに連続してGoroutineでリクエストを投げると、リクエスト数を増やしたときにタイムアウトが頻発するようになった。
まず、2000リクエストを投げてみた結果。
[RESULT] request: 2000, ok: 2000, ng: 0, time(ms) 138
一応全部捌けてはいるが、おおよそ同時にリクエストを送っているのにタイムアウト(100ms)時間を超えてしまっている。これをさらに3000に増やしてみる。
[RESULT] request: 3000, ok: 13, ng: 2987, time(ms) 372
ほぼ全滅してしまった…。タイムアウト時間を大きく過ぎておりGoroutineの処理に遅延が発生しているようだ。 そこで、都度Goroutineを生成してリクエストを投げるのではなく、 一定数のWorkerに処理させることで、同時に作られるGoroutineの数を抑制する。
type Req struct {
Okch chan int
Ngch chan int
}
func startWorker(ctx context.Context, num int) (requestch chan *Req) {
requestch = make(chan *Req)
for i := 0; i < num; i++ {
go func() {
for {
select {
case req := <-requestch:
request(req.Okch, req.Ngch)
case <-ctx.Done():
return
}
}
}()
}
return
}
func main(){
...
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
requestch := startWorker(ctx, 1000)
requestch <- req
...
}
結果、すべてのリクエストをタイムアウトせずに送れるようになった。
[RESULT] request: 3000, ok: 3000, ng: 0, time(ms) 157
[RESULT] request: 5000, ok: 5000, ng: 0, time(ms) 239
上のコードではWorkerを作るにあたって単純にWorkerの数分goroutineを生成して共通のチャネルに入ってきたものを読んで処理させているが、 以下の記事のようにDispatcherを用意してWorkerPool(chan chan Job)からWorkerのjobChannel(chan Job)を取り出して送る方法も紹介されていたので これとも比較してみた。今回は入力するチャネルだけ分けて終了方法はStartで渡したcontextに一任しているので上の方法とさほど変わらず、むしろ冗長に見えるが、 実際はWorkerそれぞれがquitするチャネルなどを持っていて、独立して終了させることができるため、Workerの数を動的にコントロールしやすいのが特長だと考えている。
Handling 1 Million Requests per Minute with Go · marcio.io
type Dispatcher struct{
Requestch chan *Req
workerPool chan chan *Req
workerNum int
}
func NewDispatcher(workerNum int) *Dispatcher{
return &Dispatcher{
Requestch: make(chan *Req),
workerPool: make(chan chan *Req, workerNum),
workerNum: workerNum,
}
}
func (d *Dispatcher) Start(ctx context.Context) error{
poolLength := len(d.workerPool)
if poolLength != 0{
return errors.New("already started")
}
for i := 0; i < d.workerNum; i++{
startWorker(ctx, 1, d.workerPool)
}
go d.dispatch(ctx)
return nil
}
func (d *Dispatcher) dispatch(ctx context.Context){
for{
select{
case req := <- d.Requestch:
// workerPoolからchanを取り出しreqを入れる
worker := <- d.workerPool
worker <- req
case <-ctx.Done():
return
}
}
}
func startWorker(ctx context.Context, num int, workerPool chan chan *Req) {
requestch := make(chan *Req)
for i := 0; i < num; i++ {
go func() {
for {
// workerPoolにchanを入れる(終わったらまだ戻る)
workerPool <- requestch
select {
case req := <-requestch:
request(req.Okch, req.Ngch)
case <-ctx.Done():
return
}
}
}()
}
return
}
func main(){
...
dispatcher := NewDispatcher(1000)
if err := dispatcher.Start(ctx); err != nil{
panic(err)
}
dispatcher.requestch <- req
}
ほとんど変わらず。
[RESULT] request: 3000, ok: 3000, ng: 0, time(ms) 169
[RESULT] request: 5000, ok: 5000, ng: 0, time(ms) 246