Run a command on a remote server using SSH in Airflow
In this article, I show how to use the
SSHHook in a
PythonOperator to connect to a remote server from Airflow using SSH and execute a command.
First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.
When that part is done, I can define the function that connects to SSH:
1 2 3 from airflow.contrib.hooks.ssh_hook import SSHHook ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
In the next step, I open a new connection and execute the command (in this example, I will use
touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the
try...finally block because the
SSHHook does not close it automatically when we no longer need it:
1 2 3 4 5 6 7 8 ssh_client = None try: ssh_client = ssh.get_conn() ssh_client.load_system_host_keys() ssh_client.exec_command('touch file_name') finally: if ssh_client: ssh_client.close()
try block, I called the
load_sytem_host_keys function to import the OpenSSH
known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.
After that, I executed the
exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by
exec_command to any variable.
If I wanted to get a response, I would have to assign the returned values to three variables:
1 stdin, stdout, stderr = ssh_client.exec_command('touch file_name')
SSHHook in PythonOperator
In this article, I want to show how to use the
SSHHook with the
PythonOperator. Therefore, I have to put the whole code in a function:
1 2 3 4 5 6 7 8 9 10 def create_the_file(): ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID) ssh_client = None try: ssh_client = ssh.get_conn() ssh_client.load_system_host_keys() ssh_client.exec_command('touch file_name') finally: if ssh_client: ssh_client.close()
and define a
PythonOperator that calls my function:
1 2 3 4 5 call_ssh_task = PythonOperator( task_id='call_ssh_task', python_callable=create_the_file, dag=dag )
Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?
Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!
You may also like
- How to retrieve the statuses of the recent DAG executions from Airflow database
- Use LatestOnlyOperator to skip some tasks while running a backfill in Airflow
- How to add an EMR step in Airflow and wait until it finishes running
- How to delay an Airflow DAG until a given hour using the DateTimeSensor
- Send SMS from an Airflow DAG using AWS SNS
- Data/MLOps engineer by day
- DevRel/copywriter by night
- Python and data engineering trainer
- Conference speaker
- Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
- Twitter: @mikulskibartosz