Run a command on a remote server using SSH in Airflow

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (54/100)

In this article, I show how to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.

When that part is done, I can define the function that connects to SSH:

1
2
3
from airflow.contrib.hooks.ssh_hook import SSHHook

ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)

In the next step, I open a new connection and execute the command (in this example, I will use touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally block because the SSHHook does not close it automatically when we no longer need it:

1
2
3
4
5
6
7
8
ssh_client = None
try:
    ssh_client = ssh.get_conn()
    ssh_client.load_system_host_keys()
    ssh_client.exec_command('touch file_name')
finally:
    if ssh_client:
        ssh_client.close()

In the try block, I called the load_sytem_host_keys function to import the OpenSSH known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.

After that, I executed the exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command to any variable.

If I wanted to get a response, I would have to assign the returned values to three variables:

1
stdin, stdout, stderr = ssh_client.exec_command('touch file_name')

Subscribe to the newsletter and join the free email course.

SSHHook in PythonOperator

In this article, I want to show how to use the SSHHook with the PythonOperator. Therefore, I have to put the whole code in a function:

1
2
3
4
5
6
7
8
9
10
def create_the_file():
    ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        ssh_client.exec_command('touch file_name')
    finally:
        if ssh_client:
            ssh_client.close()

and define a PythonOperator that calls my function:

1
2
3
4
5
call_ssh_task = PythonOperator(
    task_id='call_ssh_task',
    python_callable=create_the_file,
    dag=dag
)

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.

Do you want to work with me at riskmethods?

REMOTE position (available in Poland or Germany)