S3 Key Sensor Async

Use S3KeySensorAsync. to wait for one or multiple keys to be present in an S3 bucket. For each key, it use aiobotocore to call head_object API (or list_objects_v2 API if wildcard_match is True) to check whether it is present or not. Please keep in mind, especially when used to check a large volume of keys, that it makes one API call per key.

To check one file:

# Check if a file exists
sensor_one_key = S3KeySensorAsync(
    task_id="s3_sensor_one_key",
    bucket_name=S3_BUCKET_NAME,
    bucket_key=S3_BUCKET_KEY,
    aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py

To check multiple files:

# Check if both files exist
sensor_two_keys = S3KeySensorAsync(
    task_id="s3_sensor_two_keys",
    bucket_name=S3_BUCKET_NAME,
    bucket_key=[S3_BUCKET_KEY, S3_BUCKET_KEY_LIST],
    aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py

To check with an additional custom check you can define a function which receives a list of matched S3 object attributes and returns a boolean:

  • True: a certain criteria is met

  • False: the criteria isn’t met

This function is called for each key passed as parameter in bucket_key. The reason why the parameter of this function is a list of objects is when wildcard_match is True, multiple files can match one key. The list of matched S3 object attributes contain only the size and is this format:

[{"Size": int}]
def check_fn(files: List[Any]) -> bool:
    """
    Example of custom check: check if all files are bigger than ``1kB``

    :param files: List of S3 object attributes.
    :return: true if the criteria is met
    :rtype: bool
    """
    return all(f.get("Size", 0) > 1024 for f in files)

# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensorAsync(
    task_id="s3_sensor_key_function",
    bucket_name=S3_BUCKET_NAME,
    bucket_key=S3_BUCKET_KEY,
    check_fn=check_fn,
    aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py

Checks for changes in the number of objects at prefix in AWS S3bucket S3KeysUnchangedSensorAsync.

check_s3_key_unchanged_sensor = S3KeysUnchangedSensorAsync(
    task_id="check_s3_key_unchanged_sensor",
    bucket_name=S3_BUCKET_NAME,
    prefix=PREFIX,
    min_objects=1,
    allow_delete=True,
    previous_objects=set(),
    inactivity_period=INACTIVITY_PERIOD,
    aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_s3.py