Source code for astronomer.providers.http.hooks.http

import asyncio
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Union

import aiohttp
from aiohttp import ClientResponseError
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from asgiref.sync import sync_to_async

if TYPE_CHECKING:
    from aiohttp.client_reqrep import ClientResponse


[docs]class HttpHookAsync(BaseHook): """ Interact with HTTP servers using Python Async. :param method: the API method to be called :param http_conn_id: http connection id that has the base API url i.e https://www.google.com/ and optional authentication credentials. Default headers can also be specified in the Extra field in json format. :param auth_type: The auth type for the service :type auth_type: AuthBase of python aiohttp lib """ conn_name_attr = "http_conn_id" default_conn_name = "http_default" conn_type = "http" hook_name = "HTTP" def __init__( self, method: str = "POST", http_conn_id: str = default_conn_name, auth_type: Any = aiohttp.BasicAuth, retry_limit: int = 3, retry_delay: float = 1.0, ) -> None: self.http_conn_id = http_conn_id self.method = method.upper() self.base_url: str = "" self._retry_obj: Callable[..., Any] self.auth_type: Any = auth_type if retry_limit < 1: raise ValueError("Retry limit must be greater than equal to 1") self.retry_limit = retry_limit self.retry_delay = retry_delay
[docs] async def run( self, endpoint: Optional[str] = None, data: Optional[Union[Dict[str, Any], str]] = None, headers: Optional[Dict[str, Any]] = None, extra_options: Optional[Dict[str, Any]] = None, ) -> "ClientResponse": r""" Performs an asynchronous HTTP request call :param endpoint: the endpoint to be called i.e. resource/v1/query? :param data: payload to be uploaded or request parameters :param headers: additional headers to be passed through as a dictionary :param extra_options: Additional kwargs to pass when creating a request. For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)`` """ extra_options = extra_options or {} # headers may be passed through directly or in the "extra" field in the connection # definition _headers = {} auth = None if self.http_conn_id: conn = await sync_to_async(self.get_connection)(self.http_conn_id) if conn.host and "://" in conn.host: self.base_url = conn.host else: # schema defaults to HTTP schema = conn.schema if conn.schema else "http" host = conn.host if conn.host else "" self.base_url = schema + "://" + host if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: auth = self.auth_type(conn.login, conn.password) if conn.extra: try: _headers.update(conn.extra_dejson) except TypeError: self.log.warning("Connection to %s has invalid extra field.", conn.host) if headers: _headers.update(headers) if self.base_url and not self.base_url.endswith("/") and endpoint and not endpoint.startswith("/"): url = self.base_url + "/" + endpoint else: url = (self.base_url or "") + (endpoint or "") async with aiohttp.ClientSession() as session: if self.method == "GET": request_func = session.get elif self.method == "POST": request_func = session.post elif self.method == "PATCH": request_func = session.patch else: raise AirflowException(f"Unexpected HTTP Method: {self.method}") attempt_num = 1 while True: response = await request_func( url, json=data if self.method in ("POST", "PATCH") else None, params=data if self.method == "GET" else None, headers=headers, auth=auth, **extra_options, ) try: response.raise_for_status() return response except ClientResponseError as e: self.log.warning( "[Try %d of %d] Request to %s failed.", attempt_num, self.retry_limit, url, ) if not self._retryable_error_async(e) or attempt_num == self.retry_limit: self.log.exception("HTTP error with status: %s", e.status) # In this case, the user probably made a mistake. # Don't retry. raise AirflowException(str(e.status) + ":" + e.message) attempt_num += 1 await asyncio.sleep(self.retry_delay)
def _retryable_error_async(self, exception: ClientResponseError) -> bool: """ Determines whether or not an exception that was thrown might be successful on a subsequent attempt. It considers the following to be retryable: - requests_exceptions.ConnectionError - requests_exceptions.Timeout - anything with a status code >= 500 Most retryable errors are covered by status code >= 500. """ return exception.status >= 500