Source code for astronomer.providers.amazon.aws.operators.redshift_data
from typing import Any
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.utils.context import Context
from astronomer.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
from astronomer.providers.amazon.aws.triggers.redshift_data import RedshiftDataTrigger
[docs]class RedshiftDataOperatorAsync(RedshiftDataOperator):
"""
Executes SQL Statements against an Amazon Redshift cluster.
If there are multiple queries as part of the SQL, and one of them fails to reach a successful completion state,
the operator returns the relevant error for the failed query.
:param sql: the SQL code to be executed as a single string, or
a list of str (sql statements), or a reference to a template file.
Template references are recognized by str ending in '.sql'
:param aws_conn_id: AWS connection ID
:param parameters: (optional) the parameters to render the SQL query with.
:param autocommit: if True, each command is automatically committed.
(default value: False)
"""
def __init__(
self,
*,
poll_interval: int = 5,
**kwargs: Any,
) -> None:
self.poll_interval = poll_interval
super().__init__(**kwargs)
[docs] def execute(self, context: "Context") -> None:
"""
Makes a sync call to RedshiftDataHook, executes the query and gets back the list of query_ids and
defers trigger to poll for the status for the queries executed.
"""
redshift_data_hook = RedshiftDataHook(aws_conn_id=self.aws_conn_id)
query_ids, response = redshift_data_hook.execute_query(sql=self.sql, params=self.params)
self.log.info("Query IDs %s", query_ids)
if response.get("status") == "error":
self.execute_complete(context, event=response)
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftDataTrigger(
task_id=self.task_id,
poll_interval=self.poll_interval,
aws_conn_id=self.aws_conn_id,
query_ids=query_ids,
),
method_name="execute_complete",
)
[docs] def execute_complete(self, context: "Context", event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if "status" in event and event["status"] == "error":
msg = "context: {0}, error message: {1}".format(context, event["message"])
raise AirflowException(msg)
elif "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
else:
raise AirflowException("Did not receive valid event from the trigerrer")