Source code for astronomer.providers.amazon.aws.sensors.s3
from __future__ import annotations
import warnings
from typing import Any, Callable
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeysUnchangedSensor
from airflow.sensors.base import BaseSensorOperator
[docs]
class S3KeySensorAsync(S3KeySensor):
"""
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`
and set `deferrable` param to `True` instead.
"""
is_deprecated = True
post_deprecation_replacement = "from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor"
def __init__(self, **kwargs: Any) -> None:
warnings.warn(
(
"This module is deprecated."
"Please use `airflow.providers.amazon.aws.sensors.s3.S3KeySensor`"
"and set `deferrable` param to `True` instead."
),
DeprecationWarning,
stacklevel=2,
)
super().__init__(deferrable=True, **kwargs)
[docs]
class S3KeySizeSensorAsync(S3KeySensorAsync):
"""
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.
"""
is_deprecated = True
post_deprecation_replacement = "from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor"
def __init__(
self,
*,
check_fn: Callable[..., bool] | None = None,
**kwargs: Any,
):
warnings.warn(
"""
S3KeySizeSensorAsync is deprecated.
Please use `airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.
""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(**kwargs)
self.check_fn_user = check_fn
[docs]
class S3KeysUnchangedSensorAsync(S3KeysUnchangedSensor):
"""
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor`.
"""
is_deprecated = True
post_deprecation_replacement = "from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor"
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
warnings.warn(
(
"This module is deprecated. "
"Please use `airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor` "
"and set deferrable to True instead."
),
DeprecationWarning,
stacklevel=2,
)
return super().__init__(*args, deferrable=True, **kwargs)
[docs]
class S3PrefixSensorAsync(BaseSensorOperator):
"""
This class is deprecated.
Please use :class: `~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.
"""
is_deprecated = True
post_deprecation_replacement = "from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor"
def __init__(
self,
*,
bucket_name: str,
prefix: str | list[str],
delimiter: str = "/",
aws_conn_id: str = "aws_default",
verify: str | bool | None = None,
**kwargs: Any,
):
warnings.warn(
"""
S3PrefixSensor is deprecated.
Please use `airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.
""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(**kwargs)
self.bucket_name = bucket_name
self.prefix = [prefix] if isinstance(prefix, str) else prefix
self.delimiter = delimiter
self.aws_conn_id = aws_conn_id
self.verify = verify