Source code for astronomer.providers.amazon.aws.hooks.redshift_sql

import asyncio
from typing import Dict, List, Union

import botocore.exceptions
from asgiref.sync import sync_to_async

from astronomer.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook


[docs]class RedshiftSQLHookAsync(RedshiftDataHook): """RedshiftSQL async hook inherits from RedshiftDataHook to interact with AWS redshift cluster database"""
[docs] async def get_query_status(self, query_ids: List[str]) -> Dict[str, Union[str, List[str]]]: """ Async function to get the Query status by query Ids, this function takes list of query_ids make async connection to redshift data to get the query status by query id returns the query status. :param query_ids: list of query ids """ try: try: # for apache-airflow-providers-amazon>=3.0.0 client = await sync_to_async(self.get_conn)() except ValueError: # for apache-airflow-providers-amazon>=4.1.0 self.resource_type = None client = await sync_to_async(self.get_conn)() completed_ids: List[str] = [] for qid in query_ids: while await self.is_still_running(qid): await asyncio.sleep(1) res = client.describe_statement(Id=qid) if res["Status"] == "FINISHED": completed_ids.append(qid) elif res["Status"] == "FAILED": msg = "Error: " + res["QueryString"] + " query Failed due to, " + res["Error"] return {"status": "error", "message": msg, "query_id": qid, "type": res["Status"]} elif res["Status"] == "ABORTED": return { "status": "error", "message": "The query run was stopped by the user.", "query_id": qid, "type": res["Status"], } return {"status": "success", "completed_ids": completed_ids} except botocore.exceptions.ClientError as error: return {"status": "error", "message": str(error), "type": "ERROR"}
[docs] async def is_still_running(self, qid: str) -> Union[bool, Dict[str, str]]: """ Async function to whether the query is still running or in "PICKED", "STARTED", "SUBMITTED" state and returns True else return False """ try: try: # for apache-airflow-providers-amazon>=3.0.0 client = await sync_to_async(self.get_conn)() except ValueError: # for apache-airflow-providers-amazon>=4.1.0 self.resource_type = None client = await sync_to_async(self.get_conn)() desc = client.describe_statement(Id=qid) if desc["Status"] in ["PICKED", "STARTED", "SUBMITTED"]: return True return False except botocore.exceptions.ClientError as error: return {"status": "error", "message": str(error), "type": "ERROR"}