How to run PySpark code using the Airflow SSHOperator
This article is a part of my "100 data engineering tutorials in 100 days" challenge. (35/100)
To submit a PySpark job using SSHOperator in Airflow, we need three things:
- an existing SSH connection to the Spark cluster
- the location of the PySpark script (for example, an S3 location if we use EMR)
- parameters used by PySpark and the script
The usage of the operator looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
from airflow.contrib.operators.ssh_operator import SSHOperator
script = 's3://some_bucket/script.py'
spark_parameters = '--executor-memory 100G'
# here we can use Airflow template to define the parameters used in the script
parameters = '--db {{ params.database_instance }}, --output_path {{ params.output_path }}'
submit_pyspark_job = SSHOperator(
task_id='pyspark_submit'
ssh_conn_id='ssh_connection',
command='set -a; PYSPARK_PYTHON=python3; /usr/bin/spark-submit --deploy-mode cluster %s %s %s' % (spark_parameters, script, parameters),
dag=dag
)
Did you enjoy reading this article?
Would you like to learn more about leveraging AI to drive growth and innovation, software craft in data engineering, and MLOps?
Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!
You may also like
- Send SMS from an Airflow DAG using AWS SNS
- What to do when Airflow BashOperator fails with TemplateNotFound error
- What is the difference between a transformation and an action in Apache Spark?
- What is the difference between cache and persist in Apache Spark?
- Dependencies between DAGs: How to wait until another DAG finishes in Airflow?

Bartosz Mikulski
- MLOps engineer by day
- AI and data engineering consultant by night
- Python and data engineering trainer
- Conference speaker
- Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
- Twitter: @mikulskibartosz
- Mastodon: @mikulskibartosz@mathstodon.xyz