SFTP Sensor

Looks for either a specific file or files with a specific pattern in a server using SFTP protocol. To get more information about this sensor visit SFTPSensor

tests/system/sftp/example_sftp_sensor.py[source]

sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10)

We can also use TaskFlow API. It takes the same arguments as the SFTPSensor along with -

python_callable (optional)

A callable that will be executed after files matching the sensor criteria are found. This allows you to process the found files with custom logic. The callable receives:

  • Positional arguments from op_args

  • Keyword arguments from op_kwargs, with files_found automatically added (if op_kwargs is provided and not empty) containing the list of files that matched the sensor criteria

The return value of the callable is stored in XCom along with the files_found list, accessible via {"files_found": [...], "decorator_return_value": <callable_return_value>}.

op_args (optional)

A list of positional arguments that will get unpacked when calling your callable (templated). Only used when python_callable is provided.

op_kwargs (optional)

A dictionary of keyword arguments that will get unpacked in your function (templated). If provided and not empty, the files_found list is automatically added to this dictionary when the callable is invoked. Only used when python_callable is provided.

tests/system/sftp/example_sftp_sensor.py[source]

@task.sftp_sensor(
    task_id="sftp_sensor",
    path=FULL_FILE_PATH,
    poke_interval=10,
)
def sftp_sensor_decorator():
    print("Files were successfully found!")
    # add your logic
    return "done"

Checks for the existence of a file on an SFTP server in the deferrable mode:

tests/system/sftp/example_sftp_sensor.py[source]

sftp_sensor_with_async = SFTPSensor(
    task_id="sftp_operator_async", path=FULL_FILE_PATH, poke_interval=10, deferrable=True
)

Was this entry helpful?