Source code for

from typing import Any, Dict

from airflow.exceptions import AirflowException
from import EmrContainerHook
from import EmrContainerOperator
from airflow.utils.context import Context

from import EmrContainerOperatorTrigger

[docs]class EmrContainerOperatorAsync(EmrContainerOperator): """ An async operator that submits jobs to EMR on EKS virtual clusters. :param name: The name of the job run. :param virtual_cluster_id: The EMR on EKS virtual cluster ID :param execution_role_arn: The IAM role ARN associated with the job run. :param release_label: The Amazon EMR release version to use for the job run. :param job_driver: Job configuration details, e.g. the Spark job parameters. :param configuration_overrides: The configuration overrides for the job run, specifically either application configuration or monitoring configuration. :param client_request_token: The client idempotency token of the job run request. Use this if you want to specify a unique ID to prevent two jobs from getting started. If no token is provided, a UUIDv4 token will be generated for you. :param aws_conn_id: The Airflow connection used for AWS credentials. :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR :param max_tries: Maximum number of times to wait for the job run to finish. Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state. """
[docs] def execute(self, context: "Context") -> None: """Deferred and give control to trigger""" hook = EmrContainerHook(aws_conn_id=self.aws_conn_id, virtual_cluster_id=self.virtual_cluster_id) job_id = hook.submit_job(, execution_role_arn=self.execution_role_arn, release_label=self.release_label, job_driver=self.job_driver, configuration_overrides=self.configuration_overrides, client_request_token=self.client_request_token, ) self.defer( timeout=self.execution_timeout, trigger=EmrContainerOperatorTrigger( virtual_cluster_id=self.virtual_cluster_id,, job_id=job_id, aws_conn_id=self.aws_conn_id, poll_interval=self.poll_interval, max_tries=self.max_tries, ), method_name="execute_complete", )
[docs] def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> str: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if "status" in event and event["status"] == "error": raise AirflowException(event["message"])["message"]) job_id: str = event["job_id"] return job_id