Run a command on a remote server using SSH in Airflow

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (54/100)

In this article, I show how to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.

When that part is done, I can define the function that connects to SSH:

1
2
3
from airflow.contrib.hooks.ssh_hook import SSHHook

ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)

In the next step, I open a new connection and execute the command (in this example, I will use touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally block because the SSHHook does not close it automatically when we no longer need it:

1
2
3
4
5
6
7
8
ssh_client = None
try:
    ssh_client = ssh.get_conn()
    ssh_client.load_system_host_keys()
    ssh_client.exec_command('touch file_name')
finally:
    if ssh_client:
        ssh_client.close()

In the try block, I called the load_sytem_host_keys function to import the OpenSSH known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.

After that, I executed the exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command to any variable.

If I wanted to get a response, I would have to assign the returned values to three variables:

1
stdin, stdout, stderr = ssh_client.exec_command('touch file_name')


SSHHook in PythonOperator

In this article, I want to show how to use the SSHHook with the PythonOperator. Therefore, I have to put the whole code in a function:

1
2
3
4
5
6
7
8
9
10
def create_the_file():
    ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        ssh_client.exec_command('touch file_name')
    finally:
        if ssh_client:
            ssh_client.close()

and define a PythonOperator that calls my function:

1
2
3
4
5
call_ssh_task = PythonOperator(
    task_id='call_ssh_task',
    python_callable=create_the_file,
    dag=dag
)

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group


This website DOES NOT use cookies
but you may still see the cookies set earlier if you have already visited it.