スケーラビリティに優れるメッセージングミドルウェア。 データはPullするだけではなくhttpsのエンドポイントにPushすることもでき、Cloud Dataflowを通してBigQueryやCloud MLに繋げることもできる。GAEのTaskQueueのように遅延させる機能は今のところない。
料金はPublish/Pull/Pushしたデータ容量による。1TB送ると$60くらい。
Goのクライアントライブラリで動かしてみる。 まずTopicを作成して50件Publishした後、Subsriptionを作成して、再び50件Publishする。 Publishできるデータは10MB未満。
topic, err := client.CreateTopic(ctx, topicName)
if err != nil {
panic(err)
}
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
if _, err := topic.Publish(ctx, &pubsub.Message{Data: []byte(strconv.Itoa(i))}).Get(ctx); err != nil {
log.Fatalf("Publish error: %s", err.Error())
} else {
log.Printf("Publish successful: %d", i)
}
wg.Done()
}()
wg.Wait()
}
log.Printf("Create Subscription")
sub := createSubscription(ctx, client, topic, subscriptionName)
for i := 50; i < 100; i++ {
wg.Add(1)
go func() {
if _, err := topic.Publish(ctx, &pubsub.Message{Data: []byte(strconv.Itoa(i))}).Get(ctx); err != nil {
log.Fatalf("Publish error: %s", err.Error())
} else {
log.Printf("Publish successful: %d", i)
}
wg.Done()
}()
wg.Wait()
}
2018/07/26 21:42:51 Publish successful: 47
2018/07/26 21:42:51 Publish successful: 48
2018/07/26 21:42:51 Publish successful: 49
2018/07/26 21:42:51 Create Subscription
2018/07/26 21:42:58 Publish successful: 50
2018/07/26 21:42:58 Publish successful: 51
2018/07/26 21:42:58 Publish successful: 52
Pull時のHandlerとしてAckを呼ぶackHandlerと、Nackを呼ぶnackHandler、何もしないnothingHandlerを用意した。 AckするとPub/Subからメッセージが消え、Nackまたはタイムアウトするとリトライされる。
type GotData struct {
sync.Mutex
gotData []string
}
var g = GotData{}
func ackHandler(done func()) func(ctx context.Context, m *pubsub.Message) {
return func(ctx context.Context, m *pubsub.Message) {
g.Lock()
g.gotData = append(g.gotData, string(m.Data))
sort.Slice(g.gotData, func(i, j int) bool {
return g.gotData[i] < g.gotData[j]
})
log.Printf("ackHandler got message: %s %#v", m.Data, g.gotData)
if len(g.gotData) == 50 {
done()
}
g.Unlock()
m.Ack()
}
}
func nackHandler(ctx context.Context, m *pubsub.Message) {
log.Printf("nackHandler got message: %s", m.Data)
m.Nack()
}
func nothingHandler(ctx context.Context, m *pubsub.Message) {
log.Printf("nothingHandler got message: %s", m.Data)
}
一つのSubscriptionを複数のSubscriberで使うことでメッセージを分散処理することができる。 ただし同じstructを使うとエラーになるのでそれぞれ生成している。
nothingHandlerではSubscriptionのMaxExtensionをデフォルトの10分から10秒にすることでリトライを早めている。AckDeadlineというのもあるがPullの場合、結局MaxExtensionまで延長してしまうので意味がないようだ。
ReceiveSettingsにはほかにMaxOutstandingMessagesというのもあり、一度に処理されるメッセージの量を制限することができる。
go func() {
sub := client.Subscription(subscriptionName)
if err := sub.Receive(ctx, ackHandler(cancel)); err != nil {
panic(err)
}
}()
go func() {
sub := client.Subscription(subscriptionName)
if err := sub.Receive(ctx, nackHandler); err != nil {
panic(err)
}
}()
go func() {
sub := client.Subscription(subscriptionName)
sub.ReceiveSettings.MaxExtension = time.Second * 10
if err := sub.Receive(ctx, nothingHandler); err != nil {
panic(err)
}
}()
nack/nothingHandlerによっていくつかリトライされた後、ackHandlerにSubscription作成後のメッセージが全て届いた。 データは7日間保持される。 メッセージの順序は保証されず、今回は1件ずつになっているが重複することがある。
2018/07/26 21:43:49 nothingHandler got message: 91
2018/07/26 21:44:28 nackHandler got message: 91
2018/07/26 21:44:30 ackHandler got message: 91 []string{"50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99"}
2018/07/26 21:44:30 Finished!
スケールし、複数コンシューマーがそれぞれのタイミングでPullできるログの一時的な貯め先などとしてAWSのKinesis Data Streamsと同じようなユースケースで使われるが、Kinesisでは順序が保証されていたり、シャードの指定やデータの取り出し方といった扱いなど異なる点もある。