Launch Hive execution environment with Cloudera Docker Image and execute query to JSON log

hadoopprestosparketl

What is Hive

Hive is a data warehouse software built on Hadoop, which can access data sources such as HDFS with HiveSQL, an extended SQL. Sending a query, the job runs on MapReduce, Spark or Tez. It has fault tolerance and is mainly used in batch processing.

What is HDFS(Hadoop Distributed File System) - sambaiz-net

Presto, which access data sources with SQL likewise, can execute the query faster than Hive by parallelizing tasks and having intermediate data on memory, so it is suitable for an ad-hoc purpose. On the other hand, if the intermediate data is large, it takes a long time or can be failed. Presto can refer to Hive metastore with Hive Connector, so it can share the schema with Hive and query to it.

Impala is even faster, but seems to consume more memory.

Launch Cloudera Docker Image

Cloudera Docker Image is an image installed Cloudera Manager to CDH, which is a Cloudera distribution consisting of OSS such as Hadoop, Hive, Spark, etc. It seems that there is a difference in available features with free Express and Enterprise. I use Express this time.

$ docker pull cloudera/quickstart:latest
$ docker run --hostname=quickstart.cloudera --privileged=true -itd -p 8888 -p 7180 -p 80 cloudera/quickstart /usr/bin/docker-quickstart

Port 80 is for the tutorial, 8888 is for Hadoop’s Web UI Hue, and 7180 is for Cloudera Manager. When the memory allocated to Docker is 2GB, “Failed to contact an active Resource Manager” error occurs, so I set it to 4GB.

Create a table on Hive and run a query

In the tutorial, data is imported from MySQL with Sqoop as follows, but this time, use JSON log.

$ sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://localhost:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-parquetfile \
    --warehouse-dir=/user/hive/warehouse \
    --hive-import

The data can also be imported as String and parse it like LATERAL VIEW json_tuple(json_str, “field1”, “field2”) j AS field1, field2, but this time, it is deserialized with JSON SerDe.


Parsing with json_tuple(), Array is treated as String, but if used json_split() of brickhouse, a collection of convenient UDF (User-Defined Functions), it can be Array. Accordingly, each element can be processed as a row with a query like “SELECT col1 FROM table LATERAL VIEW explode(json_split(’[“a”,“b”,“c” ]’)) a as ja”.

col1 ja
x a
x b
x c
y a
y b
y c

If an empty array is exploded, it disappears, so you should make it into an array(null) to leave it like LEFT JOIN.

CASE WHEN size(json_split(arr)) > 0 THEN json_split(arr) ELSE array(null) END AS arr

Download a jar file of JSON SerDe.

$ curl http://www.congiu.net/hive-json-serde/1.3.8/cdh5/json-serde-1.3.8-jar-with-dependencies.jar > /usr/lib/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar

Queries can be executed on Hue, and the initial username and password are cloudera.

CREATE TABLE is as follows. The schema information is saved to metastore.

ADD JAR /usr/lib/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar;

CREATE EXTERNAL TABLE jinrou (
        participant ARRAY<STRUCT<user_id:INT,role:STRING,team:STRING>>,
        win_team    STRING,
        ts          STRING
      )
      ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
      WITH SERDEPROPERTIES ( "mapping.ts" = "timestamp" )
      LOCATION '/user/cloudera/jinrou';
      
ADD JAR /usr/lib/hive/lib/hive-contrib.jar;

Upload logs to the specipied LOCATION. This also can be done on Hue.

{"participant":[{"user_id":1,"role":"villager","team":"villager"},{"user_id":2,"role":"wolf","team":"wolf"},{"user_id":3,"role":"villager","team":"villager"},{"user_id":4,"role":"medium","team":"villager"},{"user_id":5,"role":"villager","team":"villager"},{"user_id":6,"role":"fortune-teller","team":"villager"}],"win_team":"wolf","timestamp":"2017-08-21T01:23:45.678+0900"}
{"participant":[{"user_id":3,"role":"villager","team":"villager"},{"user_id":4,"role":"wolf","team":"wolf"},{"user_id":1,"role":"villager","team":"villager"},{"user_id":2,"role":"medium","team":"villager"},{"user_id":6,"role":"villager","team":"villager"},{"user_id":5,"role":"fortune-teller","team":"villager"}],"win_team":"villager","timestamp":"2017-08-21T02:34:56.789+0900"}

Running select query,

SELECT user_id, role, SUM(is_win)/COUNT(1) AS wp FROM (
  SELECT 
    par.user_id,
    par.role, 
    CASE WHEN par.role = win_team THEN 1 ELSE 0 END AS is_win
    FROM jinrou
  LATERAL VIEW explode(participant) p AS par
) j GROUP BY user_id, role;

the data is returned.

user_id,role,wp
1,villager,0.5
2,medium,0.0
2,wolf,1.0
3,villager,0.5
4,medium,0.0
4,wolf,0.0
5,fortune-teller,0.0
5,villager,0.0
6,fortune-teller,0.0
6,villager,1.0

Fields that are not in the table definition are ignored, and those that are not in the data are null.

References

『Prestoとは何か,Prestoで何ができるか』 - トレジャーデータ(Treasure Data)公式ブログ

スケールアウト可能なSQLエンジンのベンチマークテスト:Presto vs Spark SQL vs Hive on Tez | GMOインターネット 次世代システム研究室