GCPのマネージドなQueueサービスとしてGAEのTaskQueueがあることを教えてもらったので動かしてみる。 PushQueueとPullQueueがあって、それぞれおおよそAWSのSNSとSQSに相当する。PushQueueの場合はHTTPのリクエストとしてGAEのサービスに投げられる。PullQueueはCloud Tasks APIを使えばGAE外からも使えるらしいがまだalpha。
設定ファイルqueue.yamlはこんな感じ。bucket_sizeは最大同時実行数で空いていたらrateで埋められていく。
queue:
- name: default
rate: 10/m
bucket_size: 5
retry_parameters:
min_backoff_seconds: 10
max_backoff_seconds: 300
bucket_sizeの最大は500なのでこれ以上の性能が必要な場合は複数のQueueに分けるか Cloud Pub/Subを使うことになる。ただし、At-Least-Onceなのでレコードが重複しても問題ないように作る必要がある。SQSも同じ。
GCPのCloud Pub/Sub - sambaiz-net
アプリケーション
/にアクセスすると2つのTaskをdefaultのTaskQueueにDelay25秒でPOSTする。 Taskによるリクエストは/workerで受け、30%の確率で500エラーを返すようにしている。
package main
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"strconv"
"time"
"google.golang.org/appengine"
"google.golang.org/appengine/log"
"google.golang.org/appengine/taskqueue"
)
func main() {
http.HandleFunc("/", handler)
http.HandleFunc("/worker", handlerQueue)
appengine.Main()
}
func handler(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
// POST body: name=a%26&value=20
t := taskqueue.NewPOSTTask("/worker", map[string][]string{"name": {"a&"}, "time": {strconv.FormatInt(time.Now().UnixNano(), 10)}})
t.Delay = time.Second * 25
// POST body: name=a&name=b
t2 := taskqueue.NewPOSTTask("/worker", map[string][]string{"name": {"a", "b"}})
if _, err := taskqueue.AddMulti(ctx, []*taskqueue.Task{t, t2}, ""); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintln(w, "ok")
}
func handlerQueue(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
bodyb, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body := string(bodyb)
log.Infof(ctx, "%s\n", body)
values, err := url.ParseQuery(body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Infof(ctx, "%#+v\n", values)
if rand.Float64() < 0.3 {
http.Error(w, "random fail", http.StatusInternalServerError)
return
}
fmt.Fprintln(w, "ok")
}
デプロイして何度かアクセスする。
$ gcloud app deploy queue.yaml
$ gcloud app deploy
$ gcloud app browse
TaskがQueueに入り、エラーの場合はリトライしていることが確認できる。
通常Delay経ってETA(Estimate Time of Arrival?)を迎えたものから処理されていくが、bucketやrateが小さい場合リトライが重なって渋滞すると過ぎても処理されず溜まってしまうことがある。リトライ時のDelayはmax_backoff_secondsまでの範囲内でExponential Backoffし、task_retry_limitを指定しないと永遠にリトライし続ける。