DAX (DynamoDB Accelerator)の特性と挙動確認

(2020-02-26)

DAXとは

DAXはDynamoDBの前段に置かれるマネージドなインメモリキャッシュで、 Read速度の向上(数ms->数百μs)とテーブルのRead Capacityの節約に効果がある。

DynamoDBとSDKのAPIの互換性があるため置き換えるだけで使えるようになっている。 クライアントの実装としてはHTTPではない独自のプロトコルで通信している点が異なる。

クラスタ作成時に指定するのはノード数とインスタンスタイプで、 ノード数はスループットに、インスタンスタイプはスループットとメモリ量(キャッシュヒット率)に影響する。 複数のノードがある場合、一つがWriteするプライマリーノードになり、他はリードレプリカになる。 なのでノード数を増やしてもWriteのスループットは上がらない。プライマリーノードに問題が発生したら自動でフェイルオーバーする。 ノードは最大10個まで増減できるが、インスタンスタイプは変更できない。最大10個というのは足りるのかと思ったが、数百万RPS捌けるようなので十分そうだ。

インスタンスに対して時間課金が発生し、可用性のために3ノード以上にすることが推奨されている。 そのため、リクエストがそれほどなかったり、キャッシュミスばかりだとインスタンス代の方が高くつくこともあるが、 そこそこReadするなら目に見えてコスト削減されるはずだ。ただしどれくらい次の整合性を許容できるかによる。

キャッシュの整合性

DAXはDynamoDBとは異なり、結果整合性のある読み込み(Eventually Consistent Reads)のみをサポートしているので、プライマリノードにキャッシュされ、全てのノードにレプリケーションが完了するまでの間は異なる結果を返す可能性がある。また、DAXからDynamoDBへのリクエストも結果整合性のある読み込みで行われる。

リクエストの結果は、Itemがなかった場合のネガティブキャッシュも含めて、 Item Cache(GetItem,BatchGetItem)とQuery Cache(Query,Scan)にキャッシュされ、 それぞれパラメータグループで設定されたTTLが過ぎるか、LRUアルゴリズムによって破棄される。 TTLのデフォルトは5分。

書き込みリクエストが来るとまずDynamoDBに書き込んで成功したことを確認してからキャッシュしてレスポンスを返す。 この際Item Cacheは更新されるが、Query Cacheは更新されずTTL/LRUによって破棄されるまで同じ値を返し続けてしまう。 もし問題がある場合は、TTLを短くするかDynamoDBを直接見に行くことになるが、そうするとDAXの効果が薄れてしまう。 また、大量のデータを書き込むと、その分レイテンシが増加したり、それらがすべてキャッシュに乗ることで既存のものがLRUで追い出されてキャッシュヒット率が悪くなることがあり、それらを回避するため直接書き込むという選択肢もあるが、Item Cacheが更新されないことを許容する必要がある。

取れるメトリクス

CPU使用率や、キャッシュヒット数、各リクエスト数や接続数が取れる。

メトリクス

ノードごとのCPU使用率も取れる。Writeやキャッシュミスによるプライマリーノードの負荷の高まりに注意。 レイテンシが大きくなり接続数が増える悪循環に陥る。

プライマリーノードのCPU使用率が80%程度でテーブルのキャパシティに余裕があってもリクエストがスロットリングされることがあり、一つ上のインスタンスタイプでクラスタを作り直したところ解消した。 この際、いきなり全リクエストが新クラスタに送られることで膨大なキャッシュミスが発生し、 テーブルのキャパシティを超過したりプライマリーノードに負荷が集中しないように、 Route53で割合で解決されるようにしたが、これはうまく流れてくれた。

挙動確認

DAX/DynamoへリクエストするだけだったらLambdaでも良いが、負荷をかけるのでECS上にアプリケーションをデプロイすることにした。 以前作ったBoilerplateベースで、コードはここ

ECSでアプリケーションを動かすBoilerplateを作った - sambaiz-net

TableとDAXのClusterを作成。

createDynamoTable() {
  return new dynamodb.Table(this, 'DynamoTable', {
    tableName: "dax-test-table",
    partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
    readCapacity: 50,
    writeCapacity: 50,
  });
}

createDaxCluster(subnetIds: string[]) {
  const subnetGroup = new dax.CfnSubnetGroup(this, 'DaxSubnetGroup', {
    subnetGroupName: "dax-test-subntgroup",
    description: "for dax test",
    subnetIds: subnetIds,
  })
  const daxRole = new iam.Role(this, 'DaxRole', {
    assumedBy: new iam.ServicePrincipal('dax.amazonaws.com'),
    managedPolicies: [
      iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonDynamoDBFullAccess')
    ]
  })
  return new dax.CfnCluster(this, 'DaxCluster', {
    clusterName: "dax-test",
    nodeType: "dax.r4.large",
    replicationFactor: 1,
    iamRoleArn: daxRole.roleArn,
    subnetGroupName: subnetGroup.subnetGroupName,
  })
}

DynamoDBとinterfaceが同じなのでguregu/dynamoもそのまま使えた。

func newDynamoClient(tableName string, throughDax bool) (*dynamo.DB, error) {
	var (
		client dynamodbiface.DynamoDBAPI
		err    error
	)
	if throughDax {
		cfg := dax.DefaultConfig()
		cfg.HostPorts = []string{os.Getenv("DAX_CLUSTER_URL")}
		cfg.Region = os.Getenv("AWS_REGION")
		client, err = dax.New(cfg)
		if err != nil {
			return nil, err
		}
	} else {
		client = dynamodb.New(session.New())
	}
	return dynamo.NewFromIface(client), nil
}

DAXを通したときと通さなかった時の差を確認するため、クエリパラメータで振り分けるようにした。

func handleItem(db *dynamo.DB, dax *dynamo.DB) func(w http.ResponseWriter, r *http.Request) {
	return auth(func(w http.ResponseWriter, r *http.Request) {
		client := db
		if r.URL.Query().Get("dax") == "1" {
			client = dax
		}
		switch r.Method {
		case http.MethodGet:
			var item item
			before := time.Now()
			if err := client.Table(tableName).Get("id", r.URL.Query().Get("id")).One(&item); err != nil {
				if err != dynamo.ErrNotFound {
					log.Printf("failed to get an item: %v", err)
					http.Error(w, "Internal Server Error", http.StatusInternalServerError)
				}
				return
			}
			fmt.Fprintln(w, response{
				Message:      r.URL.Query().Get("message"),
				TimeMicrosec: time.Now().Sub(before).Nanoseconds() / 1000,
			})
		...
		}
	})
}

初期データを挿入する。

$ BASE_URL=http://***
$ TOKEN=DAXTEST
$ curl -X POST -H "Authorization: token ${TOKEN}" "${BASE_URL}/initialize"
ok

Read速度の向上

DynamoDBとDAXにそれぞれGetItemすると、DynamoDBは5ms前後かかるところ、DAXはキャッシュされてからは500-1000μs前後で返ってきた。

$ curl -H "Authorization: token ${TOKEN}" "${BASE_URL}/item?id=1"
{ 4768}

$ curl -H "Authorization: token ${TOKEN}" "${BASE_URL}/item?id=1&dax=1"
{ 30628}

$ curl -H "Authorization: token ${TOKEN}" "${BASE_URL}/item?id=1&dax=1"
{ 593}

一方書き込みは前述したように遅くなる。思いの外差があった。

$ curl -X POST -H "Authorization: token ${TOKEN}" "${BASE_URL}/item" --data '{"id": "AAA", "title": "aaa"}'
{ 6135}

$ curl -X POST -H "Authorization: token ${TOKEN}" "${BASE_URL}/item?dax=1" --data '{"id": "BBB", "title": "bbb"}'
{ 39011}

Read capacityの節約

全てキャッシュヒットするパターンで負荷をかけてみる。

$ ulimit -n 2000
$ ab -n 10000 -c 1000 -r -H "Authorization: token ${TOKEN}" "${BASE_URL}/item?id=1&dax=1"

裏側のTableのキャパシティが全く使われないことを確認できた。

ノードが落ちたときの挙動

ノードを1台にして再起動するとnetwork errorになった。

failed to get an item: InternalServerError: network error
caused by: dial tcp 172.31.34.26:8111: connect: connection refused

ノードを2台に増やし、 負荷をかけている途中にプライマリーノードを落としてみる。 フェイルオーバー中にいくらかエラーが返ることを想定していたが、そんなこともなく全て200で返った。

$ ab -n 20000 -c 10 -r -H "Authorization: token ${TOKEN}" "${BASE_URL}/item?id=1&dax=1"