https://aws.amazon.com/jp/kinesis/
リアルタイムのストリーミングデータを扱うサービス群。 いまのところTokyoリージョンではKinesis Streamsしか使えない。
Kinesis Firehose
AWSのデータストアに送るストリーム。自分でデータを読む処理を書かなくてよく、スケーリングも勝手にやってくれるので簡単に使える。
https://aws.amazon.com/jp/kinesis/firehose/faqs/
Q: 送信先とは何ですか?
送信先はデータが配信されるデータストアです。Amazon Kinesis Firehose では、
現在送信先として Amazon S3、Amazon Redshift、Amazon Elasticsearch Service がサポートされています。
料金は取り込まれたデータ量による。 一見そんなに高くならないように見えるが、5KB単位で切り上げられるのでレコードのサイズが小さくて数が多い場合に注意が必要。
今回はS3に送ってみる。
圧縮方法を設定したり、Lambdaを噛ませたりすることができる。
StatusがActiveになったらKinesis Agentで送ってみる。 CloudWatchとFirehoseにPutする権限が必要。Firehoseはkinesis:ではなくfirehose:なので注意。
$ sudo yum install –y aws-kinesis-agent
/etc/aws-kinesis/agent.json
を編集する。リージョンごとのエンドポイントは
ここ
にある。
{
"awsAccessKeyId": "",
"awsSecretAccessKey": "",
"firehose.endpoint": "https://firehose.us-east-1.amazonaws.com",
"flows": [
{
"filePattern": "/tmp/hoge.log",
"deliveryStream": "hogefugastream"
}
]
}
$ sudo service aws-kinesis-agent start
$ sudo chkconfig aws-kinesis-agent on
$ echo "aaa" >> /tmp/hoge.log
$ tail /var/log/aws-kinesis-agent/aws-kinesis-agent.log
com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 2 records parsed (168 bytes),
and 2 records sent successfully to destinations. Uptime: 300044ms
S3に保存されているのを確認。
Kinesis Streams
用途を制限しないストリーム。データは保持期間の間、何度でも読むことができるので、 とりあえず必要なだけシャードを増やしてデータを入れておけばどうにかなる。 データを扱う側はそれぞれ独立に必要なタイミングで必要なだけpullするため、スケールするにあたってその先は別に考えることができ、 高負荷なシステムのlog aggregatorとして使われる。
料金は
- 時間単位のシャード速度: 1シャードは最大1000件/秒の1MB/秒の入力と2MB/秒の出力能力がある。
- PUTペイロードユニット: 追加する25KBのチャンクの数。5KBでも1チャンク。
- データ保持期間: デフォルトで24時間。7日まで延長可能。シャード時間ごとに課金。
による。
ストリーム作成時はシャード数を入れる。
Application Auto Scalingのcustom-resourceによるKinesis Data Streamsのオートスケール設定 - sambaiz-net
Firehoseと同じくKinesis Agentで送ってみる。 エンドポイントはここ。
{
"awsAccessKeyId": "",
"awsSecretAccessKey": "",
"kinesis.endpoint": "https://kinesis.us-east-1.amazonaws.com",
"flows": [
{
"filePattern": "/tmp/hoge.log",
"kinesisStream": "fugafugastream"
}
]
}
aws-cliでデータを取得する。
まず、シャードイテレーターを取得する。有効時間は300秒。 TRIM_HORIZON で最も古い方からデータを取得していく。SequenceNumberを指定して途中から読むこともできる。
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name fugafugastream
{
"ShardIterator": "AAAAAAAAAAFjKI0neNqY2N5HzGljYFCzoFqpQsdncdC6xE+ylnqvZpmusNfyViY3hBSS8WQXa67gvtkF0f2eKzxQ/Fd7SXZG8Inkb8l1UDF5t+jHgErA28gVSWyT4uYxTzzbnhm9AhcbztyQrjqehYcjEfpWIz5XmhY9K3Kjp0Crygy+OYNSS5PoQFcB1PZ7xMFE8zLTxJXLv1ANRu0Q+1m/JFxKQ3WS"
}
このシャードイテレータを使ってget-recordsする。データはBase64で入っているのでデコードして確認する。
$ aws kinesis get-records --shard-iterator AAAAAAAAAAFjKI0neNqY2N5HzGljYFCzoFqpQsdncdC6xE+ylnqvZpmusNfyViY3hBSS8WQXa67gvtkF0f2eKzxQ/Fd7SXZG8Inkb8l1UDF5t+jHgErA28gVSWyT4uYxTzzbnhm9AhcbztyQrjqehYcjEfpWIz5XmhY9K3Kjp0Crygy+OYNSS5PoQFcB1PZ7xMFE8zLTxJXLv1ANRu0Q+1m/JFxKQ3WS
{
"Records": [
{
"Data": "YWFhCg==",
"PartitionKey": "999679.8130737302",
"ApproximateArrivalTimestamp": 1487082145.518,
"SequenceNumber": "49570460043263608661463102123405561406360875697772167170"
},
...
],
"NextShardIterator": "AAAAAAAAAAE08GRdLF1d76L1wCyLIiuAgpSEkKZSkUEO0VdUt3EOfdm1oOSXA1Xc4+tJPkSmB8g5NaQqDPRS/67u5IXermTUiAj6g2lgvDCGCqWFcYMAxIwIKZjKluCPQjL9kRaUqfVAaElRoKjp4Gv7JmuBDjKpxsbF2yk4uJJDAcevqH/VVkala8UbdhTweGyFgf9VhP/ljzXlrqkZ8wbD0eFwtZ3x",
"MillisBehindLatest": 0
}
$ echo "YWFhCg==" | base64 -d
aaa
Kinesis Analytics
SourceとなるKinesis Streamsか、Firehoseを指定し、SQLを実行できる。そして必要なら次のストリームに入れることができる。
今回はSourceとしてjsonで株価のデータが入っているDemo streamを使う。 いくつかSQLテンプレートが用意されていて、その中のContinuous Filterを選択。 Streamに入ってきたものをTECHで絞って出力する。
-- ** Continuous Filter **
-- Performs a continuous filter based on a WHERE condition.
-- .----------. .----------. .----------.
-- | SOURCE | | INSERT | | DESTIN. |
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
-- | | | (PUMP) | | |
-- '----------' '----------' '----------'
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';
Kinesis Data AnalyticsのSQL, Lambdaへの出力とCDKによるリソースの作成 - sambaiz-net