Cloudera Docker ImageでHiveの実行環境を立ち上げてJSONのログにクエリを実行する

hadoopprestosparketl

Hiveとは

SQLを拡張したHiveQLでHDFSなどのデータソースにアクセスできるHadoopのデータウェアハウスソフトウェア。 クエリを送るとMapReduceやSpark、Tezのジョブが実行される。 耐障害性があり主にバッチ処理で用いられる。

HDFS(Hadoop Distributed File System)とは - sambaiz-net

同じくSQLでデータソースにアクセスできるPrestoは タスクを並列に実行し中間データをメモリ上に持つことでHiveよりも高速にクエリを実行することができアドホックな用途にも使うことができるが、中間データが大きいと時間がかかったり失敗することがあるHive ConnectorでHive metastoreを参照できるので、Hiveとスキーマを共有してクエリを実行できる。

Impalaはさらに速いがメモリの消費が激しいらしい。

Cloudera Docker Imageを起動する

Cloudera Docker ImageはHadoop、Hive、SparkなどのOSSで構成されているClouderaのディストリビューション、CDHCloudera Managerを入れたもので、無料のExpressと有料のEnterpriseで使える機能に差があるらしいが、今回はExpressで動かす。

$ docker pull cloudera/quickstart:latest
$ docker run --hostname=quickstart.cloudera --privileged=true -itd -p 8888 -p 7180 -p 80 cloudera/quickstart /usr/bin/docker-quickstart

80番ポートがチュートリアル用で、8888がHadoopのWeb UIであるHue、7180がCloudera Managerのものとなっている。Dockerに割り当てるメモリが2GBだと Failed to contact an active Resource Manager になってしまったので4GBにした。

Hiveのテーブルを作成して実行する

チュートリアルでは次のようにSqoopでMySQLからデータを取り込んでいるが今回はJSONのログを取り込む。

$ sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://localhost:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-parquetfile \
    --warehouse-dir=/user/hive/warehouse \
    --hive-import

Stringとして取り込み実行時に LATERAL VIEW json_tuple(json_str, “field1”, “field2”) j AS field1, field2 のようにパースすることもできるが、今回はJSON SerDeでデシリアライズする。


なお、json_tuple()では配列はStringになってしまうが、便利なUDF(User-Defined Functions)をまとめたbrickhouseのjson_split()を用いるとArrayにでき、 SELECT col1 FROM table LATERAL VIEW explode(json_split(’[“a”,“b”,“c” ]’)) a as ja のようなクエリでexplodeして各要素を扱うことができる。

col1 ja
x a
x b
x c
y a
y b
y c

Arrayが空の場合にexplodeすると消滅してしまうので、LEFT JOINのように残すにはarray(null)に加工してやるとよい。

CASE WHEN size(json_split(arr)) > 0 THEN json_split(arr) ELSE array(null) END AS arr

JSON SerDeのjarを持ってくる。

$ curl http://www.congiu.net/hive-json-serde/1.3.8/cdh5/json-serde-1.3.8-jar-with-dependencies.jar > /usr/lib/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar

クエリはHueから実行できて、初期ユーザー名とパスワードはどちらもcloudera。

CREATE TABLEはこんな感じ。スキーマ情報はmetastoreに入る。

ADD JAR /usr/lib/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar;

CREATE EXTERNAL TABLE jinrou (
        participant ARRAY<STRUCT<user_id:INT,role:STRING,team:STRING>>,
        win_team    STRING,
        ts          STRING
      )
      ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
      WITH SERDEPROPERTIES ( "mapping.ts" = "timestamp" )
      LOCATION '/user/cloudera/jinrou';
      
ADD JAR /usr/lib/hive/lib/hive-contrib.jar;

指定したLOCATIONにログをアップロードする。これもHueからできる。

{"participant":[{"user_id":1,"role":"villager","team":"villager"},{"user_id":2,"role":"wolf","team":"wolf"},{"user_id":3,"role":"villager","team":"villager"},{"user_id":4,"role":"medium","team":"villager"},{"user_id":5,"role":"villager","team":"villager"},{"user_id":6,"role":"fortune-teller","team":"villager"}],"win_team":"wolf","timestamp":"2017-08-21T01:23:45.678+0900"}
{"participant":[{"user_id":3,"role":"villager","team":"villager"},{"user_id":4,"role":"wolf","team":"wolf"},{"user_id":1,"role":"villager","team":"villager"},{"user_id":2,"role":"medium","team":"villager"},{"user_id":6,"role":"villager","team":"villager"},{"user_id":5,"role":"fortune-teller","team":"villager"}],"win_team":"villager","timestamp":"2017-08-21T02:34:56.789+0900"}

SELECT文を実行すると

SELECT user_id, role, SUM(is_win)/COUNT(1) AS wp FROM (
  SELECT 
    par.user_id,
    par.role, 
    CASE WHEN par.role = win_team THEN 1 ELSE 0 END AS is_win
    FROM jinrou
  LATERAL VIEW explode(participant) p AS par
) j GROUP BY user_id, role;

参照できている。

user_id,role,wp
1,villager,0.5
2,medium,0.0
2,wolf,1.0
3,villager,0.5
4,medium,0.0
4,wolf,0.0
5,fortune-teller,0.0
5,villager,0.0
6,fortune-teller,0.0
6,villager,1.0

テーブル定義にないフィールドは無視され、データの方にないものはNULLになる。

参考

『Prestoとは何か,Prestoで何ができるか』 - トレジャーデータ(Treasure Data)公式ブログ

スケールアウト可能なSQLエンジンのベンチマークテスト:Presto vs Spark SQL vs Hive on Tez | GMOインターネット 次世代システム研究室