NorikraでログをJOINする

fluentdnorikraetl

NorikraとFluentdで流れてきたログをリアルタイムに集計する - sambaiz-net

適当なログを出すコードを書いた。

sambaiz/lottery-log

これは以下のようなlottery.logを出力し続け、数秒後に一定確率で同じuidのreceived.logを出力するもの。 広告的に言えば、lotteryがimpで、receivedがclickといった感じかな。

// lottery.log
{"ts":1497453504.6818597,"uid":"b18c0d98-19b2-4e37-8fc4-6b00a4b728c3","prize":855,"isWin":true}
// received.log
{"ts":1497453515.932101,"uid":"bc4f578f-4a5f-47f1-a4e0-1ef0b43c316e"}

クエリはこんな感じ。一つはlotteryログとreceivedログをuidでJOINするもので、 received_rateの計算にはサブクエリも使っている。 received_rateの計算で分母に小さな値を足しているのはNaNやInfinityにならないようにするため。 receivedログは最大30秒遅れて出力されるため、40秒前までのreceivedログをwin:timeで見ている。 これをtime_batchにしてしまうと期待通りの結果にならないので注意。

もう一つはJavaの関数でboolを0/1にして平均をとることでisWinがtrueである割合を出している。

$ docker exec norikra norikra-client query add lottery_agg '
SELECT COUNT(*) as received_count, (COUNT(*) / (SELECT COUNT(*) + 0.00001 FROM lottery.win:time_batch(1 sec, 0).std:unique(uid))) as received_rate, AVG(prize) as prize_avg, SUM(prize) as prize_sum FROM lottery.win:time(40 sec).std:unique(uid) as a, received.win:time_batch(1 sec, 0).std:unique(uid) as b WHERE a.uid = b.uid'

$ docker exec norikra norikra-client query add lottery_win_rate 'SELECT avg(Boolean.compare(isWin, false)) as win_rate FROM lottery.win:time_batch(1 sec)'

で、このクエリの結果をElasticsearchに送って可視化してみたのがこれ。

<source>
  @type tail
  path /var/log/lottery.log
  pos_file /etc/td-agent/log.pos
  tag event.lottery
  format json
</source>

<source>
  @type tail
  path /var/log/received.log
  pos_file /etc/td-agent/log.pos
  tag event.received
  format json
</source>

<match event.*>
  @type   norikra
  norikra localhost:26571
  
  remove_tag_prefix event # event.*の部分が
  target_map_tag    yes   # targetになる

  <default>
    auto_field false 
  </default>
</match>

<source>
  @type   norikra
  norikra localhost:26571
  <fetch>
    method   event
    target   lottery_agg
    tag      string data.lottery_agg
    interval 1m
  </fetch>
  <fetch>
    method   event
    target   lottery_win_rate
    tag      string data.lottery_win_rate
    interval 1m
  </fetch>
</source>

<match data.lottery_agg>
  @type elasticsearch
  host 172.31.5.20
  port 9200
  logstash_prefix lottery
  type_name lottery_agg
  logstash_format true
</match>

<match data.lottery_win_rate>
  @type elasticsearch
  host 172.31.5.20
  port 9200
  logstash_prefix lottery
  type_name lottery_win_rate
  logstash_format true
</match>

received_rateを計算するときのウィンドウが1secと小さく、タイミングによってはreceivedの数がlotteryの数を上回ることがあるため1以下で絞っている。