Source code for astronomer.providers.apache.hive.triggers.hive_partition
import asyncio
from typing import Any, AsyncIterator, Dict, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
from astronomer.providers.apache.hive.hooks.hive import HiveCliHookAsync
[docs]
class HivePartitionTrigger(BaseTrigger):
"""
A trigger that fires and it looks for a partition in the given table
in the database or wait for the partition.
:param table: the table where the partition is present.
:param partition: The partition clause to wait for.
:param schema: database which needs to be connected in hive.
:param metastore_conn_id: connection string to connect to hive.
:param polling_period_seconds: polling period in seconds to check for the partition.
"""
def __init__(
self,
table: str,
partition: str,
polling_interval: float,
metastore_conn_id: str,
schema: str,
):
super().__init__()
self.table = table
self.partition = partition
self.polling_interval = polling_interval
self.metastore_conn_id: str = metastore_conn_id
self.schema = schema
[docs]
def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes HivePartitionTrigger arguments and classpath."""
return (
"astronomer.providers.apache.hive.triggers.hive_partition.HivePartitionTrigger",
{
"table": self.table,
"partition": self.partition,
"polling_interval": self.polling_interval,
"metastore_conn_id": self.metastore_conn_id,
"schema": self.schema,
},
)
[docs]
async def run(self) -> AsyncIterator["TriggerEvent"]:
"""Simple loop until the relevant table partition is present in the table or wait for it."""
try:
hook = self._get_async_hook()
while True:
res = await hook.partition_exists(
table=self.table,
schema=self.schema,
partition=self.partition,
polling_interval=self.polling_interval,
)
if res == "success":
yield TriggerEvent({"status": "success", "message": res})
await asyncio.sleep(self.polling_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
def _get_async_hook(self) -> HiveCliHookAsync:
return HiveCliHookAsync(metastore_conn_id=self.metastore_conn_id)