Source code for astronomer.providers.core.sensors.filesystem
import logging
import os
from typing import Any, Dict, Optional
from airflow.hooks.filesystem import FSHook
from airflow.sensors.filesystem import FileSensor
from airflow.utils.context import Context
from astronomer.providers.core.triggers.filesystem import FileTrigger
log = logging.getLogger(__name__)
[docs]class FileSensorAsync(FileSensor):
"""
Waits for a file or folder to land in a filesystem using async.
If the path given is a directory then this sensor will only return true if
any files exist inside it (either directly, or within a subdirectory)
:param fs_conn_id: reference to the File (path)
:param filepath: File or folder name (relative to the base path set within the connection), can
be a glob.
:param recursive: when set to ``True``, enables recursive directory matching behavior of
``**`` in glob filepath parameter. Defaults to ``False``.
"""
[docs] def execute(self, context: Context) -> None:
"""Airflow runs this method on the worker and defers using the trigger."""
if not self.poke(context=context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath)
self.log.info("Poking for file %s", full_path)
self.defer(
timeout=self.execution_timeout,
trigger=FileTrigger(
filepath=full_path,
recursive=self.recursive,
poll_interval=self.poke_interval,
),
method_name="execute_complete",
)
[docs] def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[str, Any]]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
self.log.info("%s completed successfully.", self.task_id)