EMR Sensor Async¶
To waits asynchronously for job running on EMR container to reach the terminal state
EmrContainerSensorAsync
.
emr_job_container_sensor = EmrContainerSensorAsync(
task_id="emr_job_container_sensor",
job_id=run_emr_container_job.output,
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
poll_interval=5,
aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_emr_eks_containers_job.py
To waits asynchronously for job running on EMR container to reach the terminal state
EmrJobFlowSensorAsync
.
job_flow_sensor = EmrJobFlowSensorAsync(
task_id="job_flow_sensor", job_flow_id=cluster_creator.output, aws_conn_id=AWS_CONN_ID
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py
To waits asynchronously until state of the EMR cluster step reaches any of the target states
EmrStepSensorAsync
.
step_checker = EmrStepSensorAsync(
task_id="watch_step",
job_flow_id=cluster_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id=AWS_CONN_ID,
)
# https://github.com/astronomer/astronomer-providers/tree/main/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py