Spark, EMR, Lambda, AWS

I know the title of this post looks like a collection of buzz words but there is code behind it.

AWS Lambda function is a service which allow you to create an action (in this example add an EMR step) according to all kind of events. Such events can be cron expressions or schedule event (once an hour, once a day, etc.), change in S3 files, change in DynamoDB table, etc.

The goal of the code is to add an EMR step to an existing EMR cluster. The step can actually be anything- Map Reduce, Spark job, JAR step , etc. This example show how to add a Spark job but it is easy to adjust it to your needs.

The parameters for the Spark job can depend on the specific event – for example, put file event data will include data about the specific file.

In this example there is only a place holder for the script parameters and the spark configuration parameters.

Note – in order to run this Spark job your code should be in Spark master machine. Possible this could be done in the script as well but here we assume it is already in CODE_DIR.

 


import sys
import time
import boto3
def lambda_handler(event, context):
conn = boto3.client("emr")
# chooses the first cluster which is Running or Waiting
# possibly can also choose by name or already have the cluster id
clusters = conn.list_clusters()
# choose the correct cluster
clusters = [c["Id"] for c in clusters["Clusters"]
if c["Status"]["State"] in ["RUNNING", "WAITING"]]
if not clusters:
sys.stderr.write("No valid clusters\n")
sys.stderr.exit()
# take the first relevant cluster
cluster_id = clusters[0]
# code location on your emr master node
CODE_DIR = "/home/hadoop/code/"
# spark configuration example
step_args = ["/usr/bin/spark-submit", "–spark-conf", "your-configuration",
CODE_DIR + "your_file.py", '–your-parameters', 'parameters']
step = {"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': step_args
}
}
action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
return "Added step: %s"%(action)

 

One thought on “Spark, EMR, Lambda, AWS

Leave a comment