Source code for aws_ci_bot.sns_event

# -*- coding: utf-8 -*-

"""
SNS event handling in Lambda function.
"""

import typing as T
import json
from datetime import datetime

from aws_lambda_event import SNSTopicNotificationEvent
from aws_codebuild import CodeBuildEvent, BuildJobRun
from aws_codecommit import CodeCommitEvent

from .console import get_s3_console_url
from . import logger


[docs]def extract_sns_message_dict(event: dict) -> dict: """ Extract the AWS CodeStar notification event from the received Lambda event. Lambda event includes the SNS message body, and the SNS message body includes the AWS CodeStar notification event. :param event: the original lambda function input payload :return: the AWS CodeStar notification event data in dict """ sns_event = SNSTopicNotificationEvent.from_dict(event) return json.loads(sns_event.Records[0].message)
[docs]def encode_partition_key(dt: datetime) -> str: """ Figure out the s3 partition part based on the given datetime. """ return "/".join( [ f"year={dt.year}", f"month={str(dt.month).zfill(2)}", f"day={str(dt.day).zfill(2)}", ] )
[docs]def upload_ci_event( s3_client, event_dict: dict, event_obj: T.Union[CodeCommitEvent, CodeBuildEvent], bucket: str, prefix: str, verbose: bool = True, ) -> str: """ Upload CI/CD lambda event to S3 as a backup. :param s3_client: :param event_dict: :param event_obj: :param bucket: :param prefix: :param verbose: :return: the S3 uri where the event is uploaded """ if verbose: logger.info("Upload CI event to S3 ...") if prefix.endswith("/"): prefix = prefix[:-1] utc_now = datetime.utcnow() time_str = utc_now.strftime("%Y-%m-%dT%H-%M-%S.%f") if isinstance(event_obj, CodeCommitEvent): s3_key = ( f"{prefix}/codecommit/{event_obj.repo_name}/" f"{encode_partition_key(utc_now)}/" f"{time_str}_{event_obj.repo_name}.json" ) elif isinstance(event_obj, CodeBuildEvent): build_job_run = BuildJobRun.from_arn(event_obj.build_arn) if build_job_run.is_batch: type = "batch-build" else: type = "single-build" s3_key = ( f"{prefix}/codebuild" f"/{build_job_run.project_name}" f"/{encode_partition_key(utc_now)}" f"/{type}" f"/{time_str}_{build_job_run.run_id}.json" ) else: # pragma: no cover raise NotImplementedError s3_uri = f"s3://{bucket}/{s3_key}" if verbose: logger.info(f"s3 uri: {s3_uri}", 1) s3_client.put_object( Bucket=bucket, Key=s3_key, Body=json.dumps(event_dict, indent=4), ) console_url = get_s3_console_url(bucket=bucket, prefix=s3_key) if verbose: logger.info(f"preview event at: {console_url}", 1) return s3_uri