GAEのTaskQueue

gcpgolang

GCPのマネージドなQueueサービスとしてGAEのTaskQueueがあることを教えてもらったので動かしてみる。 PushQueueとPullQueueがあって、それぞれおおよそAWSのSNSとSQSに相当する。PushQueueの場合はHTTPのリクエストとしてGAEのサービスに投げられる。PullQueueはCloud Tasks APIを使えばGAE外からも使えるらしいがまだalpha。

設定ファイルqueue.yamlはこんな感じ。bucket_sizeは最大同時実行数で空いていたらrateで埋められていく。

queue:
- name: default
  rate: 10/m
  bucket_size: 5
  retry_parameters:
    min_backoff_seconds: 10
    max_backoff_seconds: 300

bucket_sizeの最大は500なのでこれ以上の性能が必要な場合は複数のQueueに分けるか Cloud Pub/Subを使うことになる。ただし、At-Least-Onceなのでレコードが重複しても問題ないように作る必要がある。SQSも同じ

GCPのCloud Pub/Sub - sambaiz-net

アプリケーション

/にアクセスすると2つのTaskをdefaultのTaskQueueにDelay25秒でPOSTする。 Taskによるリクエストは/workerで受け、30%の確率で500エラーを返すようにしている。

package main

import (
	"fmt"
	"io/ioutil"
	"math/rand"
	"net/http"
	"net/url"
	"strconv"
	"time"

	"google.golang.org/appengine"
	"google.golang.org/appengine/log"
	"google.golang.org/appengine/taskqueue"
)

func main() {
	http.HandleFunc("/", handler)
	http.HandleFunc("/worker", handlerQueue)
	appengine.Main()
}

func handler(w http.ResponseWriter, r *http.Request) {
	ctx := appengine.NewContext(r)
	// POST body: name=a%26&value=20
	t := taskqueue.NewPOSTTask("/worker", map[string][]string{"name": {"a&"}, "time": {strconv.FormatInt(time.Now().UnixNano(), 10)}})
	t.Delay = time.Second * 25
	// POST body: name=a&name=b
	t2 := taskqueue.NewPOSTTask("/worker", map[string][]string{"name": {"a", "b"}})
	if _, err := taskqueue.AddMulti(ctx, []*taskqueue.Task{t, t2}, ""); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprintln(w, "ok")
}

func handlerQueue(w http.ResponseWriter, r *http.Request) {
	ctx := appengine.NewContext(r)

	bodyb, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	body := string(bodyb)
	log.Infof(ctx, "%s\n", body)

	values, err := url.ParseQuery(body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	log.Infof(ctx, "%#+v\n", values)
	if rand.Float64() < 0.3 {
		http.Error(w, "random fail", http.StatusInternalServerError)
		return
	}
	fmt.Fprintln(w, "ok")
}

デプロイして何度かアクセスする。

$ gcloud app deploy queue.yaml
$ gcloud app deploy
$ gcloud app browse

TaskがQueueに入り、エラーの場合はリトライしていることが確認できる。

PushQueue

通常Delay経ってETA(Estimate Time of Arrival?)を迎えたものから処理されていくが、bucketやrateが小さい場合リトライが重なって渋滞すると過ぎても処理されず溜まってしまうことがある。リトライ時のDelayはmax_backoff_secondsまでの範囲内でExponential Backoffし、task_retry_limitを指定しないと永遠にリトライし続ける。

参考

スケーラブル GCP アーキテクチャ