Source code for astronomer.providers.sftp.sensors.sftp

from datetime import timedelta
from typing import Any, Dict, Optional

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.sftp.sensors.sftp import SFTPSensor

from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger
from astronomer.providers.utils.typing_compat import Context


[docs]class SFTPSensorAsync(SFTPSensor): """ Polls an SFTP server continuously until a file_pattern is matched at a defined path :param path: The path on the SFTP server to search for a file matching the file pattern. Authentication method used in the SFTP connection must have access to this path :param file_pattern: Pattern to be used for matching against the list of files at the path above. Uses the fnmatch module from std library to perform the matching. :param timeout: How long, in seconds, the sensor waits for successful before timing out :param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive """ def __init__( self, *, path: str, file_pattern: str = "", timeout: Optional[float] = None, **kwargs: Any, ) -> None: self.path = path self.file_pattern = file_pattern if timeout is None: timeout = conf.getfloat("sensors", "default_timeout") super().__init__(path=path, file_pattern=file_pattern, timeout=timeout, **kwargs) self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) # type: ignore[assignment]
[docs] def execute(self, context: Context) -> None: """ Logic that the sensor uses to correctly identify which trigger to execute, and defer execution as expected. """ self.defer( timeout=timedelta(seconds=self.timeout), trigger=SFTPTrigger( path=self.path, file_pattern=self.file_pattern, sftp_conn_id=self.sftp_conn_id, poke_interval=self.poke_interval, newer_than=self.newer_than, ), method_name="execute_complete", )
[docs] def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event is not None: if "status" in event and event["status"] == "error": raise AirflowException(event["message"]) if "status" in event and event["status"] == "success": self.log.info("%s completed successfully.", self.task_id) self.log.info(event["message"]) return None raise AirflowException("No event received in trigger callback")