import json
import boto3
from datetime import datetime, timedelta
import time
import os
from os import path
from logging import getLogger
import concurrent.futures
logger = getLogger()
region = os.environ['AWS_REGION']
client = boto3.client('logs')
jinja_env = Environment(loader=FileSystemLoader(
path.join(path.dirname(__file__), 'templates'), encoding='utf8'))
def get_lambda_stat(function_name: str):
log_group = '/aws/lambda/' + function_name
query = "filter @type = \"REPORT\" | fields @requestId, @billedDuration, @maxMemoryUsed, @duration, @timestamp | sort @timestamp desc | limit 1"
result = {
'executed_at': '',
'request_id': '',
'billed_duration': 0.0,
'max_memory_used': 0.0,
'duration': 0.0,
}
try:
start_query_response = client.start_query(
logGroupName=log_group,
startTime=int(
(datetime.today() - timedelta(hours=1*24)).timestamp()),
endTime=int(datetime.now().timestamp()),
queryString=query,
)
query_id = start_query_response['queryId']
response = None
while response == None or response['status'] == 'Running':
logger.info(
f'{function_name} - Waiting for query to complete ...')
time.sleep(1)
response = client.get_query_results(
queryId=query_id
)
if 'results' in response and len(response['results']) > 0:
result['request_id'] = response['results'][0][0]['value']
result['billed_duration'] = float(
response['results'][0][1]['value'])
result['max_memory_used'] = float(
response['results'][0][2]['value']) / 1024 / 1024
result['duration'] = float(
response['results'][0][3]['value'])
result['executed_at'] = response['results'][0][4]['value']
except Exception as e:
logger.error(f'Failed to query {function_name}')
logger.error(e)
return result
def main():
func_list = [
FUNCTION_NAME_1,
FUNCTION_NAME_2
]
stats = []
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(get_lambda_stat, func_list)
for result in results:
stats.append(result)