Run a command on a remote server using SSH in Airflow
In this article, I show how to use the
SSHHook in a
PythonOperator to connect to a remote server from Airflow using SSH and execute a command.
First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.
When that part is done, I can define the function that connects to SSH:
1 2 3 from airflow.contrib.hooks.ssh_hook import SSHHook ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
In the next step, I open a new connection and execute the command (in this example, I will use
touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the
try...finally block because the
SSHHook does not close it automatically when we no longer need it:
1 2 3 4 5 6 7 8 ssh_client = None try: ssh_client = ssh.get_conn() ssh_client.load_system_host_keys() ssh_client.exec_command('touch file_name') finally: if ssh_client: ssh_client.close()
try block, I called the
load_sytem_host_keys function to import the OpenSSH
known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.
After that, I executed the
exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by
exec_command to any variable.
If I wanted to get a response, I would have to assign the returned values to three variables:
1 stdin, stdout, stderr = ssh_client.exec_command('touch file_name')
SSHHook in PythonOperator
In this article, I want to show how to use the
SSHHook with the
PythonOperator. Therefore, I have to put the whole code in a function:
1 2 3 4 5 6 7 8 9 10 def create_the_file(): ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID) ssh_client = None try: ssh_client = ssh.get_conn() ssh_client.load_system_host_keys() ssh_client.exec_command('touch file_name') finally: if ssh_client: ssh_client.close()
and define a
PythonOperator that calls my function:
1 2 3 4 5 call_ssh_task = PythonOperator( task_id='call_ssh_task', python_callable=create_the_file, dag=dag )
You may also like
- How to use Virtualenv to prepare a separate environment for Python function running in Airflow
- Dependencies between DAGs: How to wait until another DAG finishes in Airflow?
- Doing data quality checks using the SQLCheckOperator
- How to run PySpark code using the Airflow SSHOperator
- How to trigger an Airflow DAG from another DAG