Source code for astronomer.providers.apache.hive.triggers.named_hive_partition
import asyncio
from typing import Any, AsyncIterator, Dict, List, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
from astronomer.providers.apache.hive.hooks.hive import HiveCliHookAsync
[docs]
class NamedHivePartitionTrigger(BaseTrigger):
"""
A trigger that fires, and it looks for a partition in the given table
in the database or wait for the partition.
:param partition_names: List of fully qualified names of the
partitions to wait for.
:param metastore_conn_id: connection string to connect to hive.
:param polling_interval: polling period in seconds to check for the partition.
"""
def __init__(
self,
partition_names: List[str],
metastore_conn_id: str,
polling_interval: float,
):
super().__init__()
self.partition_names = partition_names
self.polling_interval = polling_interval
self.metastore_conn_id: str = metastore_conn_id
[docs]
def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes NamedHivePartitionTrigger arguments and classpath."""
return (
"astronomer.providers.apache.hive.triggers.named_hive_partition.NamedHivePartitionTrigger",
{
"partition_names": self.partition_names,
"polling_interval": self.polling_interval,
"metastore_conn_id": self.metastore_conn_id,
},
)
[docs]
async def run(self) -> AsyncIterator["TriggerEvent"]:
"""Run until found all given partition in Hive."""
try:
hook = HiveCliHookAsync(metastore_conn_id=self.metastore_conn_id)
number_of_partitions = len(self.partition_names)
res = [False] * number_of_partitions
while True:
for i in range(number_of_partitions):
if not res[i]:
schema, table, partition = hook.parse_partition_name(self.partition_names[i])
if hook.check_partition_exists(
schema=schema,
table=table,
partition=partition,
):
self.log.info("Found partition for %s.%s/%s", schema, table, partition)
res[i] = True
if all(res):
yield TriggerEvent({"status": "success", "message": "Named hive partition found."})
await asyncio.sleep(self.polling_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})