GCPのCloud Pub/Sub

gcpgolang

スケーラビリティに優れるメッセージングミドルウェア。 データはPullするだけではなくhttpsのエンドポイントにPushすることもでき、Cloud Dataflowを通してBigQueryやCloud MLに繋げることもできる。GAEのTaskQueueのように遅延させる機能は今のところない。

GAEのTaskQueue - sambaiz-net

料金はPublish/Pull/Pushしたデータ容量による。1TB送ると$60くらい。

Goのクライアントライブラリで動かしてみる。 まずTopicを作成して50件Publishした後、Subsriptionを作成して、再び50件Publishする。 Publishできるデータは10MB未満

topic, err := client.CreateTopic(ctx, topicName)
if err != nil {
  panic(err)
}

var wg sync.WaitGroup
for i := 0; i < 50; i++ {
  wg.Add(1)
  go func() {
    if _, err := topic.Publish(ctx, &pubsub.Message{Data: []byte(strconv.Itoa(i))}).Get(ctx); err != nil {
      log.Fatalf("Publish error: %s", err.Error())
    } else {
      log.Printf("Publish successful: %d", i)
    }
    wg.Done()
  }()
  wg.Wait()
}

log.Printf("Create Subscription")
sub := createSubscription(ctx, client, topic, subscriptionName)

for i := 50; i < 100; i++ {
  wg.Add(1)
  go func() {
    if _, err := topic.Publish(ctx, &pubsub.Message{Data: []byte(strconv.Itoa(i))}).Get(ctx); err != nil {
      log.Fatalf("Publish error: %s", err.Error())
    } else {
      log.Printf("Publish successful: %d", i)
    }
    wg.Done()
  }()
  wg.Wait()
}
2018/07/26 21:42:51 Publish successful: 47
2018/07/26 21:42:51 Publish successful: 48
2018/07/26 21:42:51 Publish successful: 49
2018/07/26 21:42:51 Create Subscription
2018/07/26 21:42:58 Publish successful: 50
2018/07/26 21:42:58 Publish successful: 51
2018/07/26 21:42:58 Publish successful: 52

Pull時のHandlerとしてAckを呼ぶackHandlerと、Nackを呼ぶnackHandler、何もしないnothingHandlerを用意した。 AckするとPub/Subからメッセージが消え、Nackまたはタイムアウトするとリトライされる。

type GotData struct {
	sync.Mutex
	gotData []string
}

var g = GotData{}

func ackHandler(done func()) func(ctx context.Context, m *pubsub.Message) {
	return func(ctx context.Context, m *pubsub.Message) {
		g.Lock()
		g.gotData = append(g.gotData, string(m.Data))
		sort.Slice(g.gotData, func(i, j int) bool {
			return g.gotData[i] < g.gotData[j]
		})
		log.Printf("ackHandler got message: %s %#v", m.Data, g.gotData)
		if len(g.gotData) == 50 {
			done()
		}
		g.Unlock()
		m.Ack()
	}
}

func nackHandler(ctx context.Context, m *pubsub.Message) {
	log.Printf("nackHandler got message: %s", m.Data)
	m.Nack()
}

func nothingHandler(ctx context.Context, m *pubsub.Message) {
	log.Printf("nothingHandler got message: %s", m.Data)
}

一つのSubscriptionを複数のSubscriberで使うことでメッセージを分散処理することができる。 ただし同じstructを使うとエラーになるのでそれぞれ生成している。

nothingHandlerではSubscriptionのMaxExtensionデフォルトの10分から10秒にすることでリトライを早めている。AckDeadlineというのもあるがPullの場合、結局MaxExtensionまで延長してしまうので意味がないようだ。

ReceiveSettingsにはほかにMaxOutstandingMessagesというのもあり、一度に処理されるメッセージの量を制限することができる。

go func() {
  sub := client.Subscription(subscriptionName)
  if err := sub.Receive(ctx, ackHandler(cancel)); err != nil {
    panic(err)
  }
}()
go func() {
  sub := client.Subscription(subscriptionName)
  if err := sub.Receive(ctx, nackHandler); err != nil {
    panic(err)
  }
}()
go func() {
  sub := client.Subscription(subscriptionName)
  sub.ReceiveSettings.MaxExtension = time.Second * 10
  if err := sub.Receive(ctx, nothingHandler); err != nil {
		panic(err)
  }
}()

nack/nothingHandlerによっていくつかリトライされた後、ackHandlerにSubscription作成後のメッセージが全て届いた。 データは7日間保持される。 メッセージの順序は保証されず、今回は1件ずつになっているが重複することがある。

2018/07/26 21:43:49 nothingHandler got message: 91
2018/07/26 21:44:28 nackHandler got message: 91
2018/07/26 21:44:30 ackHandler got message: 91 []string{"50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99"}
2018/07/26 21:44:30 Finished!

スケールし、複数コンシューマーがそれぞれのタイミングでPullできるログの一時的な貯め先などとしてAWSのKinesis Data Streamsと同じようなユースケースで使われるが、Kinesisでは順序が保証されていたり、シャードの指定やデータの取り出し方といった扱いなど異なる点もある。

Kinesis Streams/Firehose/Analyticsを試す - sambaiz-net