EMR Sensor Async

To waits asynchronously for job running on EMR container to reach the terminal state EmrContainerSensorAsync.

emr_job_container_sensor = EmrContainerSensorAsync(
    task_id="emr_job_container_sensor",
    job_id=str(run_emr_container_job.output),
    virtual_cluster_id=str(VIRTUAL_CLUSTER_ID),
    poll_interval=5,
    aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_emr_eks_containers_job.py

To waits asynchronously for job running on EMR container to reach the terminal state EmrJobFlowSensorAsync.

job_flow_sensor = EmrJobFlowSensorAsync(
    task_id="job_flow_sensor", job_flow_id=cluster_creator.output, aws_conn_id=AWS_CONN_ID
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py

To waits asynchronously until state of the EMR cluster step reaches any of the target states EmrStepSensorAsync.

step_checker = EmrStepSensorAsync(
    task_id="watch_step",
    job_flow_id=str(cluster_creator.output),
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py