Call Livy's REST API to run a Spark job

sparkpython

Call Livy’s REST API to run a Spark job.

Install Livy on EMR on EKS and run Spark jobs from local Jupyter notebooks with Sparkmagic - sambaiz-net

First, create a Session and wait until the status becomes idle.

import requests
import json
import time

livy_url = "http://<livy_url>:8998"

# config for EMR on EKS
config = {
  "kind": "spark",
  "driverMemory": "1000M",
  "driverCores": 2,
  "conf": {
    "spark.kubernetes.namespace": "emr",
    "spark.kubernetes.container.image": "public.ecr.aws/emr-on-eks/spark/emr-7.1.0:latest",
    "spark.kubernetes.authenticate.driver.serviceAccountName": "emr-containers-sa-spark-driver-*****", 
    "spark.kubernetes.file.upload.path": "s3://<bucket>"
  }
}

def create_session():
  response = requests.post(
    f"{livy_url}/sessions",
    data=json.dumps(config),
    headers={'Content-Type': 'application/json'}
  )
  if response.status_code != 201:
    print("Failed to create session:", response.content)
    exit(1)
  return response.json()['id']

def wait_for_session_idle(session_id):
  while True:
    response = requests.get(f"{livy_url}/sessions/{session_id}")
    state = response.json()['state']
    print(f"Session state: {state}")
    if state == 'dead':
      print("Session is dead:", session_id)
      exit(1)
    if state == 'idle':
      break
    time.sleep(5)

session_id = create_session()
wait_for_session_idle(session_id)

Pass codes to the session, create a Statement, wait for completion, and get the result.

import textwrap

def create_statement(session_id, codes):
  response = requests.post(
    f"{livy_url}/sessions/{session_id}/statements",
    data=json.dumps({"code": codes}),
    headers={'Content-Type': 'application/json'}
  )
  if response.status_code != 201:
    print("Failed to create statement:", response.content)
    exit(1)
  return response.json()['id']

def wait_for_statement_result(session_id, statement_id):
  while True:
    response = requests.get(f"{livy_url}/sessions/{session_id}/statements/{statement_id}")
    result = response.json()
    state = result['state']
    print(f"Statement state: {state}")
    if state in ['available', 'error']:
      break
    time.sleep(5)
  return result

codes = textwrap.dedent("""\
  val data = spark.sql("SELECT 1 as a, 2 as b, 3 as c").collect()
  %json data
  """)
statement_id = create_statement(session_id, codes)
result = wait_for_statement_result(session_id, statement_id)
print(result['output'])

The output is returned as text/plan by default, but %json and %table magic can output it as each format.

{'status': 'ok', 'execution_count': 0, 'data': {'application/json': [{'schema': [{'name': 'a', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}, {'name': 'b', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}, {'name': 'c', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}], 'values': [1, 2, 3]}]}}

Delete the session.

def delete_session(session_id):
  response = requests.delete(f"{livy_url}/sessions/{session_id}")
  return response.status_code

delete_status = delete_session(session_id)
print("Deleted session:", delete_status)

References

apache spark - Livy Server: return a dataframe as JSON? - Stack Overflow