Cloudera Docker ImageでHiveの実行環境を立ち上げてJSONのログにクエリを実行する

(2017-08-24)

Hiveとは

Hadoop上で動くデータウェアハウスソフトウェア。 SQLを拡張したHiveQLを書くとデータを処理するMapReduceやSpark、Tezのジョブが生成される。 クエリの実行に時間がかかり、耐障害性があるのでDailyやHourlyのバッチで使われる。

ちなみにAthenaにも使われているPresto はタスクを並列に実行し、中間データをメモリ上に持つことで数分以内に結果が得られるので ダッシュボードなどの用途でアドホックに使える。中間データが大きいと時間がかかったり失敗する

Impalaはさらに速いけどメモリの消費が激しいらしい。

Cloudera Docker Imageを起動する

Cloudera Docker Imageには

  • CDH: Clouderaのディストリビューション。Hadoop、Hive、SparkなどのOSSで構成されている。
  • Cloudera Manager: CDHクラスタを管理する。無料のExpressと有料のEnterpriseで使える機能に差がある

が含まれていて、これを起動すると諸々立ち上がる。CDHクラスタを組むのはサポートされていないようなのでテスト用らしい。

$ docker pull cloudera/quickstart:latest
$ docker run --hostname=quickstart.cloudera --privileged=true -itd -p 8888 -p 7180 -p 80 cloudera/quickstart /usr/bin/docker-quickstart

80がチュートリアルで、8888がHadoopのWeb UIのHue、7180がCloudera Manager。Dockerに割り当てるメモリが2GBだとFailed to contact an active Resource Managerになってしまったので4GBにした。

Hiveのテーブルを作成して実行する

チュートリアルではSqoopを使ってDBから取り込んでいるんだけど、 今回はjsonのログのテーブルを作成する。

$ sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://localhost:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-parquetfile \
    --warehouse-dir=/user/hive/warehouse \
    --hive-import

JSONを扱うにはStringからLATERAL VIEW json_tuple(json_str, "field1", "field2") j AS field1, field2のように実行時にパースする方法と、JSON SerDeで最初から別カラムにいれる方法があるが、今回はSerDeでやる。


json_tupleを使う場合、配列はStringになってしまうのでこれをArrayにするには便利なUDF(User-Defined Functions)をまとめたbrickhouseのjson_splitが使える。例えば、SELECT col1 FROM table LATERAL VIEW explode(json_split('["a","b","c" ]')) a as jaのようにするとArrayになったStringがexplodeしてtableの各列に3種のカラムjaが並ぶ。

col1 ja
x a
x b
x c
y a
y b
y c

Arrayが空の場合にexplodeすると消滅してしまうので、LEFT JOINのように残すにはarray(null)に加工してやるとよい。

CASE WHEN size(json_split(arr)) > 0 THEN json_split(arr) ELSE array(null) END AS arr

JSON SerDeのjarを持ってくる。

$ curl http://www.congiu.net/hive-json-serde/1.3.8/cdh5/json-serde-1.3.8-jar-with-dependencies.jar > /usr/lib/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar

クエリはHueから実行できて、初期ユーザー名とパスワードはどちらもcloudera。

CREATE TABLEはこんな感じ。スキーマ情報はmetastoreに入る。

ADD JAR /usr/lib/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar;

CREATE EXTERNAL TABLE jinrou (
        participant ARRAY<STRUCT<user_id:INT,role:STRING,team:STRING>>,
        win_team    STRING,
        ts          STRING
      )
      ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
      WITH SERDEPROPERTIES ( "mapping.ts" = "timestamp" )
      LOCATION '/user/cloudera/jinrou';
      
ADD JAR /usr/lib/hive/lib/hive-contrib.jar;

指定したLOCATIONにログをアップロードする。これもHueからできる。

{"participant":[{"user_id":1,"role":"villager","team":"villager"},{"user_id":2,"role":"wolf","team":"wolf"},{"user_id":3,"role":"villager","team":"villager"},{"user_id":4,"role":"medium","team":"villager"},{"user_id":5,"role":"villager","team":"villager"},{"user_id":6,"role":"fortune-teller","team":"villager"}],"win_team":"wolf","timestamp":"2017-08-21T01:23:45.678+0900"}
{"participant":[{"user_id":3,"role":"villager","team":"villager"},{"user_id":4,"role":"wolf","team":"wolf"},{"user_id":1,"role":"villager","team":"villager"},{"user_id":2,"role":"medium","team":"villager"},{"user_id":6,"role":"villager","team":"villager"},{"user_id":5,"role":"fortune-teller","team":"villager"}],"win_team":"villager","timestamp":"2017-08-21T02:34:56.789+0900"}

SELECT文を実行すると

SELECT user_id, role, SUM(is_win)/COUNT(1) AS wp FROM (
  SELECT 
    par.user_id,
    par.role, 
    CASE WHEN par.role = win_team THEN 1 ELSE 0 END AS is_win
    FROM jinrou
  LATERAL VIEW explode(participant) p AS par
) j GROUP BY user_id, role;

参照できている。

user_id,role,wp
1,villager,0.5
2,medium,0.0
2,wolf,1.0
3,villager,0.5
4,medium,0.0
4,wolf,0.0
5,fortune-teller,0.0
5,villager,0.0
6,fortune-teller,0.0
6,villager,1.0

テーブルの定義よりjsonのフィールドが多いと無視されて、ないものはNULLになる。

参考

『Prestoとは何か,Prestoで何ができるか』 - トレジャーデータ(Treasure Data)公式ブログ

スケールアウト可能なSQLエンジンのベンチマークテスト:Presto vs Spark SQL vs Hive on Tez | GMOインターネット 次世代システム研究室