logo
February 5, 2023

[TIL] How to query a Lamda function execution information Cloudwatch Logs

AWS - How to query Cloudwatch Logs from Lambda function

Let write a simple function by Python.

It collects AWS Lambda logs using CloudWatch Logs and it extracts and get some metric like request id, duration, bill duration, max memory used,…

Create the query on AWS Cloudwatch logs Insight

Lamda function writes a report log with all information I need with a type REPORT.

So I use this query

python
filter @type = \"REPORT\" | fields @requestId, @billedDuration, @maxMemoryUsed, @duration, @timestamp | sort @timestamp desc | limit 1

Write a Python function to execute the query

Some main points

  • Use boto3 client for call AWS Cloudwatch logs API
  • Using try…except for error handling
  • Using conccurent.future for call multiple query pararelly
  • Sample code

    python
    import json
    import boto3
    from datetime import datetime, timedelta
    import time
    import os
    from os import path
    from logging import getLogger
    import concurrent.futures
    logger = getLogger()
    region = os.environ['AWS_REGION']
    client = boto3.client('logs')
    jinja_env = Environment(loader=FileSystemLoader(
    path.join(path.dirname(__file__), 'templates'), encoding='utf8'))
    def get_lambda_stat(function_name: str):
    log_group = '/aws/lambda/' + function_name
    query = "filter @type = \"REPORT\" | fields @requestId, @billedDuration, @maxMemoryUsed, @duration, @timestamp | sort @timestamp desc | limit 1"
    result = {
    'executed_at': '',
    'request_id': '',
    'billed_duration': 0.0,
    'max_memory_used': 0.0,
    'duration': 0.0,
    }
    try:
    start_query_response = client.start_query(
    logGroupName=log_group,
    startTime=int(
    (datetime.today() - timedelta(hours=1*24)).timestamp()),
    endTime=int(datetime.now().timestamp()),
    queryString=query,
    )
    query_id = start_query_response['queryId']
    response = None
    while response == None or response['status'] == 'Running':
    logger.info(
    f'{function_name} - Waiting for query to complete ...')
    time.sleep(1)
    response = client.get_query_results(
    queryId=query_id
    )
    if 'results' in response and len(response['results']) > 0:
    result['request_id'] = response['results'][0][0]['value']
    result['billed_duration'] = float(
    response['results'][0][1]['value'])
    result['max_memory_used'] = float(
    response['results'][0][2]['value']) / 1024 / 1024
    result['duration'] = float(
    response['results'][0][3]['value'])
    result['executed_at'] = response['results'][0][4]['value']
    except Exception as e:
    logger.error(f'Failed to query {function_name}')
    logger.error(e)
    return result
    def main():
    func_list = [
    FUNCTION_NAME_1,
    FUNCTION_NAME_2
    ]
    stats = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(get_lambda_stat, func_list)
    for result in results:
    stats.append(result)