Source code for astronomer.providers.apache.hive.hooks.hive
"""This module contains the Apache HiveCli hook async."""
import asyncio
from typing import Tuple
from airflow.hooks.base import BaseHook
from impala.dbapi import connect
from impala.hiveserver2 import HiveServer2Connection
[docs]class HiveCliHookAsync(BaseHook):
"""
HiveCliHookAsync to interact with the Hive using impyla library
:param metastore_conn_id: connection string for the hive
:param auth_mechanism: auth mechanism to use for authentication
"""
def __init__(self, metastore_conn_id: str) -> None:
"""Get the connection parameters separated from connection string"""
self.metastore_conn_id = self.get_connection(conn_id=metastore_conn_id)
self.auth_mechanism = self.metastore_conn_id.extra_dejson.get("authMechanism", "PLAIN")
[docs] def get_hive_client(self) -> HiveServer2Connection:
"""Makes a connection to the hive client using impyla library"""
return connect(
host=self.metastore_conn_id.host,
port=self.metastore_conn_id.port,
auth_mechanism=self.auth_mechanism,
user=self.metastore_conn_id.login,
password=self.metastore_conn_id.password,
)
[docs] async def partition_exists(self, table: str, schema: str, partition: str, polling_interval: float) -> str:
"""
Checks for the existence of a partition in the given hive table.
:param table: table in hive where the partition exists.
:param schema: database where the hive table exists
:param partition: partition to check for in given hive database and hive table.
:param polling_interval: polling interval in seconds to sleep between checks
"""
client = self.get_hive_client()
cursor = client.cursor()
query = f"show partitions {schema}.{table} partition({partition})"
cursor.execute_async(query)
while cursor.is_executing():
await asyncio.sleep(polling_interval)
results = cursor.fetchall()
if len(results) == 0:
return "failure"
return "success"
[docs] @staticmethod
def parse_partition_name(partition: str) -> Tuple[str, str, str]:
"""Parse partition string into schema, table, and partition."""
first_split = partition.split(".", 1)
if len(first_split) == 1:
schema = "default"
table_partition = max(first_split) # poor man first
else:
schema, table_partition = first_split
second_split = table_partition.split("/", 1)
if len(second_split) == 1:
raise ValueError(f"Could not parse {partition} into table, partition")
else:
table, partition = second_split
return schema, table, partition
[docs] def check_partition_exists(self, schema: str, table: str, partition: str) -> bool:
"""
Check whether given partition exist or not.
:param schema: Name of the Hive schema.
:param table: Name of the table.
:param partition: Name of the partition
"""
self.log.info("Checking for partition %s.%s/%s", schema, table, partition)
client = self.get_hive_client()
cursor = client.cursor()
query = f"show partitions {schema}.{table} partition({partition})"
cursor.execute_async(query)
results = cursor.fetchall()
if not results:
return False
return True