How to run PySpark code using the Airflow SSHOperator

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (35/100)

To submit a PySpark job using SSHOperator in Airflow, we need three things:

  • an existing SSH connection to the Spark cluster
  • the location of the PySpark script (for example, an S3 location if we use EMR)
  • parameters used by PySpark and the script

The usage of the operator looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
from airflow.contrib.operators.ssh_operator import SSHOperator

script = 's3://some_bucket/script.py'
spark_parameters = '--executor-memory 100G'
# here we can use Airflow template to define the parameters used in the script
parameters = '--db {{ params.database_instance }}, --output_path {{ params.output_path }}' 

submit_pyspark_job = SSHOperator(
    task_id='pyspark_submit'
    ssh_conn_id='ssh_connection',
    command='set -a; PYSPARK_PYTHON=python3; /usr/bin/spark-submit --deploy-mode cluster %s %s %s' % (spark_parameters, script, parameters),
    dag=dag
)



Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group


This website DOES NOT use cookies
but you may still see the cookies set earlier if you have already visited it.