- Published on
February 5, 2023
[TIL] How to query a Lamda function execution information Cloudwatch Logs
AWS - How to query Cloudwatch Logs from Lambda function
Let write a simple function by Python.
It collects AWS Lambda logs using CloudWatch Logs and it extracts and get some metric like request id, duration, bill duration, max memory used,…
Lamda function writes a report log with all information I need with a type REPORT.
So I use this query
pythonfilter @type = \"REPORT\" | fields @requestId, @billedDuration, @maxMemoryUsed, @duration, @timestamp | sort @timestamp desc | limit 1
Some main points
Sample code
pythonimport jsonimport boto3from datetime import datetime, timedeltaimport timeimport osfrom os import pathfrom logging import getLoggerimport concurrent.futureslogger = getLogger()region = os.environ['AWS_REGION']client = boto3.client('logs')jinja_env = Environment(loader=FileSystemLoader(path.join(path.dirname(__file__), 'templates'), encoding='utf8'))def get_lambda_stat(function_name: str):log_group = '/aws/lambda/' + function_namequery = "filter @type = \"REPORT\" | fields @requestId, @billedDuration, @maxMemoryUsed, @duration, @timestamp | sort @timestamp desc | limit 1"result = {'executed_at': '','request_id': '','billed_duration': 0.0,'max_memory_used': 0.0,'duration': 0.0,}try:start_query_response = client.start_query(logGroupName=log_group,startTime=int((datetime.today() - timedelta(hours=1*24)).timestamp()),endTime=int(datetime.now().timestamp()),queryString=query,)query_id = start_query_response['queryId']response = Nonewhile response == None or response['status'] == 'Running':logger.info(f'{function_name} - Waiting for query to complete ...')time.sleep(1)response = client.get_query_results(queryId=query_id)if 'results' in response and len(response['results']) > 0:result['request_id'] = response['results'][0][0]['value']result['billed_duration'] = float(response['results'][0][1]['value'])result['max_memory_used'] = float(response['results'][0][2]['value']) / 1024 / 1024result['duration'] = float(response['results'][0][3]['value'])result['executed_at'] = response['results'][0][4]['value']except Exception as e:logger.error(f'Failed to query {function_name}')logger.error(e)return resultdef main():func_list = [FUNCTION_NAME_1,FUNCTION_NAME_2]stats = []with concurrent.futures.ThreadPoolExecutor() as executor:results = executor.map(get_lambda_stat, func_list)for result in results:stats.append(result)
Other articles
More articles ➜- Published on