Call Livy’s REST API to run a Spark job.
First, create a Session and wait until the status becomes idle.
import requests
import json
import time
livy_url = "http://<livy_url>:8998"
# config for EMR on EKS
config = {
"kind": "spark",
"driverMemory": "1000M",
"driverCores": 2,
"conf": {
"spark.kubernetes.namespace": "emr",
"spark.kubernetes.container.image": "public.ecr.aws/emr-on-eks/spark/emr-7.1.0:latest",
"spark.kubernetes.authenticate.driver.serviceAccountName": "emr-containers-sa-spark-driver-*****",
"spark.kubernetes.file.upload.path": "s3://<bucket>"
}
}
def create_session():
response = requests.post(
f"{livy_url}/sessions",
data=json.dumps(config),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 201:
print("Failed to create session:", response.content)
exit(1)
return response.json()['id']
def wait_for_session_idle(session_id):
while True:
response = requests.get(f"{livy_url}/sessions/{session_id}")
state = response.json()['state']
print(f"Session state: {state}")
if state == 'dead':
print("Session is dead:", session_id)
exit(1)
if state == 'idle':
break
time.sleep(5)
session_id = create_session()
wait_for_session_idle(session_id)
Pass codes to the session, create a Statement, wait for completion, and get the result.
import textwrap
def create_statement(session_id, codes):
response = requests.post(
f"{livy_url}/sessions/{session_id}/statements",
data=json.dumps({"code": codes}),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 201:
print("Failed to create statement:", response.content)
exit(1)
return response.json()['id']
def wait_for_statement_result(session_id, statement_id):
while True:
response = requests.get(f"{livy_url}/sessions/{session_id}/statements/{statement_id}")
result = response.json()
state = result['state']
print(f"Statement state: {state}")
if state in ['available', 'error']:
break
time.sleep(5)
return result
codes = textwrap.dedent("""\
val data = spark.sql("SELECT 1 as a, 2 as b, 3 as c").collect()
%json data
""")
statement_id = create_statement(session_id, codes)
result = wait_for_statement_result(session_id, statement_id)
print(result['output'])
The output is returned as text/plan by default, but %json and %table magic can output it as each format.
{'status': 'ok', 'execution_count': 0, 'data': {'application/json': [{'schema': [{'name': 'a', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}, {'name': 'b', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}, {'name': 'c', 'dataType': {}, 'nullable': False, 'metadata': {'map': {}}}], 'values': [1, 2, 3]}]}}
Delete the session.
def delete_session(session_id):
response = requests.delete(f"{livy_url}/sessions/{session_id}")
return response.status_code
delete_status = delete_session(session_id)
print("Deleted session:", delete_status)
References
apache spark - Livy Server: return a dataframe as JSON? - Stack Overflow