S3 Key Sensor Async¶
Use S3KeySensorAsync
. to wait for one or multiple keys to be present in an S3 bucket.
For each key, it use aiobotocore to call
head_object
API (or list_objects_v2
API if wildcard_match
is True
) to check whether it is present or not.
Please keep in mind, especially when used to check a large volume of keys, that it makes one API call per key.
To check one file:
# Check if a file exists
sensor_one_key = S3KeySensorAsync(
task_id="s3_sensor_one_key",
bucket_name=S3_BUCKET_NAME,
bucket_key=S3_BUCKET_KEY,
aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py
To check multiple files:
# Check if both files exist
sensor_two_keys = S3KeySensorAsync(
task_id="s3_sensor_two_keys",
bucket_name=S3_BUCKET_NAME,
bucket_key=[S3_BUCKET_KEY, S3_BUCKET_KEY_LIST],
aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py
To check with an additional custom check you can define a function which receives a list of matched S3 object attributes and returns a boolean:
True
: a certain criteria is metFalse
: the criteria isn’t met
This function is called for each key passed as parameter in bucket_key
.
The reason why the parameter of this function is a list of objects is when wildcard_match
is True
,
multiple files can match one key. The list of matched S3 object attributes contain only the size and is this format:
[{"Size": int}]
def check_fn(files: List[Any]) -> bool:
"""
Example of custom check: check if all files are bigger than ``1kB``
:param files: List of S3 object attributes.
:return: true if the criteria is met
:rtype: bool
"""
return all(f.get("Size", 0) > 1024 for f in files)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensorAsync(
task_id="s3_sensor_key_function",
bucket_name=S3_BUCKET_NAME,
bucket_key=S3_BUCKET_KEY,
check_fn=check_fn,
aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py
Checks for changes in the number of objects at prefix in AWS S3bucket
S3KeysUnchangedSensorAsync
.
check_s3_key_unchanged_sensor = S3KeysUnchangedSensorAsync(
task_id="check_s3_key_unchanged_sensor",
bucket_name=S3_BUCKET_NAME,
prefix=PREFIX,
min_objects=1,
allow_delete=True,
previous_objects=set(),
inactivity_period=INACTIVITY_PERIOD,
aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py