LivyのREST APIを呼んでSparkジョブを実行する

sparkpython

Livy の REST API を呼んで Spark ジョブを実行してみる。

Livy を EMR on EKS にインストールしSparkmagic でローカルの Jupyter Notebook から Spark のジョブを実行する - sambaiz-net

まず Session を作成し idle 状態になるまで待つ。

import requests
import json
import time

livy_url = "http://<livy_url>:8998"

# config for EMR on EKS
config = {
  "kind": "spark",
  "driverMemory": "1000M",
  "driverCores": 2,
  "conf": {
    "spark.kubernetes.namespace": "emr",
    "spark.kubernetes.container.image": "public.ecr.aws/emr-on-eks/spark/emr-7.1.0:latest",
    "spark.kubernetes.authenticate.driver.serviceAccountName": "emr-containers-sa-spark-driver-*****", 
    "spark.kubernetes.file.upload.path": "s3://<bucket>"
  }
}

def create_session():
  response = requests.post(
    f"{livy_url}/sessions",
    data=json.dumps(config),
    headers={'Content-Type': 'application/json'}
  )
  if response.status_code != 201:
    print("Failed to create session:", response.content)
    exit(1)
  return response.json()['id']

def wait_for_session_idle(session_id):
  while True:
    response = requests.get(f"{livy_url}/sessions/{session_id}")
    state = response.json()['state']
    print(f"Session state: {state}")
    if state == 'dead':
      print("Session is dead:", session_id)
      exit(1)
    if state == 'idle':
      break
    time.sleep(5)

session_id = create_session()
wait_for_session_idle(session_id)

セッションに対してコードを渡して Statement を作成し、完了を待って結果を取得する。

import textwrap

def create_statement(session_id, codes):
  response = requests.post(
    f"{livy_url}/sessions/{session_id}/statements",
    data=json.dumps({"code": codes}),
    headers={'Content-Type': 'application/json'}
  )
  if response.status_code != 201:
    print("Failed to create statement:", response.content)
    exit(1)
  return response.json()['id']

def wait_for_statement_result(session_id, statement_id):
  while True:
    response = requests.get(f"{livy_url}/sessions/{session_id}/statements/{statement_id}")
    result = response.json()
    state = result['state']
    print(f"Statement state: {state}")
    if state in ['available', 'error']:
      break
    time.sleep(5)
  return result

codes = textwrap.dedent("""\
  val data = spark.sql("SELECT 1 as a, 2 as b, 3 as c").collect()
  %json data
  """)
statement_id = create_statement(session_id, codes)
result = wait_for_statement_result(session_id, statement_id)
print(result['output'])

結果は何もしないと text/plain で出力されるが、 %json や %table magic を用いることでそれぞれのフォーマットで出力できる。

{'status': 'ok', 'execution_count': 0, 'data': {'application/json': [{'schema': [{'name': 'a', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}, {'name': 'b', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}, {'name': 'c', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}], 'values': [1, 2, 3]}]}}

作ったセッションを削除する。

def delete_session(session_id):
  response = requests.delete(f"{livy_url}/sessions/{session_id}")
  return response.status_code

delete_status = delete_session(session_id)
print("Deleted session:", delete_status)

参考

apache spark - Livy Server: return a dataframe as JSON? - Stack Overflow