Source code for astronomer.providers.amazon.aws.hooks.aws_logs

import asyncio
from typing import Any, AsyncGenerator, Dict, Optional

from botocore.exceptions import ClientError

from astronomer.providers.amazon.aws.hooks.base_aws_async import AwsBaseHookAsync


[docs]class AwsLogsHookAsync(AwsBaseHookAsync): """ Interact with AWS CloudWatch Logs using aiobotocore python library Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. """ def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["client_type"] = "logs" super().__init__(*args, **kwargs)
[docs] async def describe_log_streams_async( self, log_group: str, stream_prefix: str, order_by: str, count: int ) -> Optional[Dict[str, Any]]: """ Async function to get the list of log streams for the specified log group. You can list all the log streams or filter the results by prefix. You can also control how the results are ordered. :param log_group: The name of the log group. :param stream_prefix: The prefix to match. :param order_by: If the value is LogStreamName , the results are ordered by log stream name. If the value is LastEventTime , the results are ordered by the event time. The default value is LogStreamName. :param count: The maximum number of items returned """ async with await self.get_client_async() as client: try: response: Dict[str, Any] = await client.describe_log_streams( logGroupName=log_group, logStreamNamePrefix=stream_prefix, orderBy=order_by, limit=count, ) return response except ClientError as error: # On the very first training job run on an account, there's no log group until # the container starts logging, so ignore any errors thrown about that if error.response["Error"]["Code"] == "ResourceNotFoundException": return None raise error
[docs] async def get_log_events_async( self, log_group: str, log_stream_name: str, start_time: int = 0, skip: int = 0, start_from_head: bool = True, ) -> AsyncGenerator[Any, Dict[str, Any]]: """ A generator for log items in a single stream. This will yield all the items that are available at the current moment. :param log_group: The name of the log group. :param log_stream_name: The name of the specific stream. :param start_time: The time stamp value to start reading the logs from (default: 0). :param skip: The number of log entries to skip at the start (default: 0). This is for when there are multiple entries at the same timestamp. :param start_from_head: whether to start from the beginning (True) of the log or at the end of the log (False). """ next_token = None while True: if next_token is not None: token_arg: Optional[dict[str, str]] = {"nextToken": next_token} else: token_arg = {} async with await self.get_client_async() as client: response = await client.get_log_events( logGroupName=log_group, logStreamName=log_stream_name, startTime=start_time, startFromHead=start_from_head, **token_arg, ) events = response["events"] event_count = len(events) if event_count > skip: events = events[skip:] skip = 0 else: skip -= event_count events = [] for event in events: await asyncio.sleep(1) yield event if next_token != response["nextForwardToken"]: next_token = response["nextForwardToken"] else: return # pragma: no cover