diff --git a/Project-2/Part-1/Project 2 (Part-I)_ PaaS.pdf b/Project-2/Part-1/Project 2 (Part-I)_ PaaS.pdf index c92530a..ca949a6 100644 Binary files a/Project-2/Part-1/Project 2 (Part-I)_ PaaS.pdf and b/Project-2/Part-1/Project 2 (Part-I)_ PaaS.pdf differ diff --git a/Project-2/Part-1/src/Dockerfile b/Project-2/Part-1/src/Dockerfile new file mode 100644 index 0000000..e7bdf9c --- /dev/null +++ b/Project-2/Part-1/src/Dockerfile @@ -0,0 +1,54 @@ +#__copyright__ = "Copyright 2024, VISA Lab" +#__license__ = "MIT" + +# Define global args +ARG FUNCTION_DIR="/home/app/" +ARG RUNTIME_VERSION="3.8" +ARG DISTRO_VERSION="3.12" + +FROM alpine:latest +FROM python:${RUNTIME_VERSION} AS python-alpine + +RUN python${RUNTIME_VERSION} -m pip install --upgrade pip + +FROM python-alpine AS build-image + +# Include global args in this stage of the build +ARG FUNCTION_DIR +ARG RUNTIME_VERSION +# Create function directory +RUN mkdir -p ${FUNCTION_DIR} + +# Install Lambda Runtime Interface Client for Python +RUN python${RUNTIME_VERSION} -m pip install awslambdaric --target ${FUNCTION_DIR} + +# Stage 3 - final runtime image +# Grab a fresh copy of the Python image +FROM python-alpine +# Include global arg in this stage of the build +ARG FUNCTION_DIR +# Set working directory to function root directory +WORKDIR ${FUNCTION_DIR} +# Copy in the built dependencies +COPY --from=build-image ${FUNCTION_DIR} ${FUNCTION_DIR} +# (Optional) Add Lambda Runtime Interface Emulator and use a script in the ENTRYPOINT for simpler local runs +ADD https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie /usr/bin/aws-lambda-rie +RUN chmod 755 /usr/bin/aws-lambda-rie + +# Install ffmpeg +RUN apt-get update +RUN apt-get install -y ffmpeg + +# Copy handler function +COPY requirements.txt ${FUNCTION_DIR} + +RUN python${RUNTIME_VERSION} -m pip install -r requirements.txt --target ${FUNCTION_DIR} +COPY entry.sh / + +# Copy function code +COPY handler.py ${FUNCTION_DIR} +RUN chmod 777 /entry.sh + +# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) +ENTRYPOINT [ "/entry.sh" ] +CMD [ "handler.handler" ] diff --git a/Project-2/Part-1/src/README.md b/Project-2/Part-1/src/README.md new file mode 100644 index 0000000..fe53fb8 --- /dev/null +++ b/Project-2/Part-1/src/README.md @@ -0,0 +1,5 @@ +# Part-1: Video-splitting stage - S3-triggered Lambda + +- `handler.py` gets the uploaded video file, splits 10 frames using `ffmpeg`, uploads the output folder of frames to another bucket +- `lambda_s3_policy.json` defines the permission policy needed for the lambda function's IAM role +- `dummy_s3_trigger_event.json` is a sample S3 PUT event diff --git a/Project-2/Part-1/src/dummy_s3_trigger_event.json b/Project-2/Part-1/src/dummy_s3_trigger_event.json new file mode 100644 index 0000000..26e19d7 --- /dev/null +++ b/Project-2/Part-1/src/dummy_s3_trigger_event.json @@ -0,0 +1,38 @@ +{ +"Records":[ +{ +"eventVersion":"2.0", +"eventSource":"aws:s3", +"awsRegion":"us-east-1", +"eventTime":"1970-01-01T00:00:00.000Z", +"eventName":"ObjectCreated:Put", +"userIdentity":{ +"principalId":"EXAMPLE" +}, +"requestParameters":{ +"sourceIPAddress":"127.0.0.1" +}, +"responseElements":{ +"x-amz-request-id":"EXAMPLE123456789", +"x-amz-id-2":"EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH" +}, +"s3":{ +"s3SchemaVersion":"1.0", +"configurationId":"testConfigRule", +"bucket":{ +"name":"1229569564-input", +"ownerIdentity":{ +"principalId":"EXAMPLE" +}, +"arn":"arn:aws:s3:::1229569564-input" +}, +"object":{ +"key":"jellyfish jam.mp4", +"size":1024, +"eTag":"0123456789abcdef0123456789abcdef", +"sequencer":"0A1B2C3D4E5F678901" +} +} +} +] +} \ No newline at end of file diff --git a/Project-2/Part-1/src/entry.sh b/Project-2/Part-1/src/entry.sh new file mode 100644 index 0000000..a608361 --- /dev/null +++ b/Project-2/Part-1/src/entry.sh @@ -0,0 +1,6 @@ +#!/bin/sh +if [ -z "${AWS_LAMBDA_RUNTIME_API}" ]; then + exec /usr/bin/aws-lambda-rie /usr/local/bin/python -m awslambdaric $1 +else + exec /usr/local/bin/python -m awslambdaric $1 +fi diff --git a/Project-2/Part-1/src/grader_script_p1.py b/Project-2/Part-1/src/grader_script_p1.py new file mode 100644 index 0000000..2a1cd34 --- /dev/null +++ b/Project-2/Part-1/src/grader_script_p1.py @@ -0,0 +1,270 @@ +__copyright__ = "Copyright 2024, VISA Lab" +__license__ = "MIT" + +import pdb +import time +import botocore +import argparse +import textwrap +import boto3 +from boto3 import client as boto3_client +from botocore.exceptions import ClientError +from datetime import datetime,timezone,timedelta + +class aws_grader(): + def __init__(self, access_key, secret_key, input_bucket, output_bucket, lambda_name, region): + + self.access_key = access_key + self.secret_key = secret_key + self.region = region + self.s3 = boto3_client('s3', aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, region_name=region) + self.cloudwatch = boto3_client('cloudwatch', aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, region_name=region) + self.iam_session = boto3.Session(aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key) + self.s3_resources = self.iam_session.resource('s3', region) + self.lambda_function = boto3_client('lambda', aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, region_name=region) + self.in_bucket_name = input_bucket + self.out_bucket_name = output_bucket + self.lambda_name = lambda_name + self.test_result = {} + + def validate_lambda_exists(self, TC_num): + try: + response = self.lambda_function.get_function( + FunctionName=self.lambda_name + ) + print(f"Lambda function {self.lambda_name} HTTPStatusCode {response['ResponseMetadata']['HTTPStatusCode']}") + self.test_result[TC_num] = "PASS" + except self.lambda_function.exceptions.ResourceNotFoundException as e: + print(f"Error {e}") + self.test_result[TC_num] = "FAIL" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + + def validate_s3_subfolders(self, TC_num): + in_objects = self.s3.list_objects_v2(Bucket=self.in_bucket_name) + if in_objects['KeyCount']==0: + self.test_result[TC_num] = "FAIL" + print(f"Empty bucket {self.in_bucket_name}") + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + return + self.test_result[TC_num] = "PASS" + for obj in in_objects['Contents']: + folder_name = obj['Key'].rsplit('.',1)[0] + out_objects = self.s3.list_objects_v2(Bucket=self.out_bucket_name, Prefix=folder_name, Delimiter='/') + if out_objects['KeyCount'] == 1 or out_objects['KeyCount'] == 11: + folder_name = out_objects['CommonPrefixes'][0]['Prefix'].rsplit("/")[0] + prefix_name = out_objects['Prefix'] + if folder_name == prefix_name: + print(f"{prefix_name} matches with {folder_name}") + else: + prefix_name = out_objects['Prefix'] + self.test_result[TC_num] = "FAIL" + print(f"NO folder named {prefix_name}") + print(out_objects) + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + + def validate_s3_output_objects(self, TC_num): + bucket = self.s3_resources.Bucket(self.out_bucket_name) + in_bucket = self.s3_resources.Bucket(self.in_bucket_name) + + try: + objects = list(bucket.objects.all()) + print(f"Got {len(objects)} objects {[o.key for o in objects]} from bucket {bucket.name}") + in_objects = list(in_bucket.objects.all()) + self.test_result[TC_num] = "PASS" + + for i,folder_n in enumerate(in_objects): + if len(in_objects) * 10 == len(objects) or len(in_objects) * 11 == len(objects): + print(f"Number of objects matches for given input {folder_n}") + self.test_result[TC_num] = "PASS" + else: + self.test_result[TC_num] = "FAIL" + break + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + + + except ClientError: + print(f"Couldn't get objects for bucket {bucket.name}") + raise + else: + return + + # You have to make sure to run the workload generator and it executes within 15 mins + # of polling for cloudwatch metrics. + def check_lambda_duration(self, TC_num): + response = self.cloudwatch.get_metric_data( + MetricDataQueries=[ + { + 'Id': 'testDuration', + 'MetricStat': { + 'Metric': { + 'Namespace': 'AWS/Lambda', + 'MetricName': 'Duration' + }, + 'Period': 600, + 'Stat': 'Average' + }, + 'ReturnData': True, + }, + ], + StartTime=datetime.now().utcnow() - timedelta(minutes=15), + EndTime=datetime.now().utcnow(), + ScanBy='TimestampAscending' + ) + print(response['MetricDataResults'][0]['Values']) + values = response['MetricDataResults'][0]['Values'] + if not values: + self.test_result[TC_num] = "FAIL" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + return + if max(values) > 10000: + self.test_result[TC_num] = "FAIL" + else: + self.test_result[TC_num] = "PASS" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + + def check_lambda_concurrency(self,TC_num): + response = self.cloudwatch.get_metric_data( + MetricDataQueries=[ + { + 'Id': 'testConcurrency', + 'MetricStat': { + 'Metric': { + 'Namespace': 'AWS/Lambda', + 'MetricName': 'ConcurrentExecutions' + }, + 'Period': 600, + 'Stat': 'Maximum' + }, + 'ReturnData': True, + }, + ], + StartTime=datetime.now().utcnow() - timedelta(minutes=15), + EndTime=datetime.now().utcnow(), + ScanBy='TimestampAscending' + ) + print(response['MetricDataResults'][0]['Values']) + values = response['MetricDataResults'][0]['Values'] + if not values: + self.test_result[TC_num] = "FAIL" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + return + if max(values) < 5: + self.test_result[TC_num] = "FAIL" + else: + self.test_result[TC_num] = "PASS" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + + def check_bucket_exist(self, bucket): + if not bucket: + print(f"Bucket name is empty!") + return False + try: + self.s3.head_bucket(Bucket=bucket) + print(f"Bucket {bucket} Exists!") + return True + except botocore.exceptions.ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the bucket does not exist. + error_code = int(e.response['Error']['Code']) + if error_code == 403: + print("Private Bucket. Forbidden Access!") + return True + elif error_code == 404: + print(f"Bucket {bucket} does Not Exist!") + return False + def empty_s3_bucket(self, bucket_name): + bucket = self.s3_resources.Bucket(bucket_name) + bucket.objects.all().delete() + print(f"{bucket_name} S3 Bucket is now EMPTY !!") + + def count_bucket_objects(self, bucket_name): + bucket = self.s3_resources.Bucket(bucket_name) + count = 0 + for index in bucket.objects.all(): + count += 1 + #print(f"{bucket_name} S3 Bucket has {count} objects !!") + return count + + def validate_s3_buckets_initial(self, TC_num): + print(" - Run this BEFORE the workload generator client starts. Press Ctrl^C to exit.") + print(" - WARN: If there are objects in the S3 buckets; they will be deleted") + print(" ---------------------------------------------------------") + + in_isExist = self.check_bucket_exist(self.in_bucket_name) + out_isExist = self.check_bucket_exist(self.out_bucket_name) + + if in_isExist: + ip_obj_count = self.count_bucket_objects(self.in_bucket_name) + print(f"S3 Input Bucket:{self.in_bucket_name} has {ip_obj_count} object(s)") + if out_isExist: + op_obj_count = self.count_bucket_objects(self.out_bucket_name) + print(f"S3 Output Bucket:{self.out_bucket_name} has {op_obj_count} object(s)") + + if in_isExist and out_isExist and ip_obj_count==0 and op_obj_count==0: + self.test_result[TC_num] = "PASS" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + else: + self.test_result[TC_num] = "FAIL" + print(f"Test status of {TC_num} : {self.test_result[TC_num]}") + + def display_menu(self): + print("\n") + print("=============================================================================") + print("======== Welcome to CSE546 Cloud Computing AWS Console ======================") + print("=============================================================================") + print(f"IAM ACESS KEY ID: {self.access_key}") + print(f"IAM SECRET ACCESS KEY: {self.secret_key}") + print("=============================================================================") + print("1 - Validate 1 Lambda function") + print("2 - Validate S3 Buckets names and initial states") + print("3 - Validate S3 output bucket subfolders") + print("4 - Validate S3 output objects") + print("5 - Check lambda average duration") + print("6 - Check lambda concurrency") + print("0 - Exit") + print("Enter a choice:") + choice = input() + return choice + + def main(self): + while(1): + choice = self.display_menu() + if int(choice) == 1: + self.validate_lambda_exists('Test_1') + elif int(choice) == 2: + self.validate_s3_buckets_initial('Test_2') + elif int(choice) == 3: + self.validate_s3_subfolders('Test_3') + elif int(choice) == 4: + self.validate_s3_output_objects('Test_4') + elif int(choice) == 5: + self.check_lambda_duration('Test_5') + elif int(choice) == 6: + self.check_lambda_concurrency('Test_6') + elif int(choice) == 0: + break + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Grading Script') + parser.add_argument('--access_key', type=str, help='ACCCESS KEY ID of the grading IAM user') + parser.add_argument('--secret_key', type=str, help='SECRET KEY of the grading IAM user') + parser.add_argument('--input_bucket', type=str, help='Name of the S3 Input Bucket') + parser.add_argument('--output_bucket', type=str, help='Name of the S3 Output Bucket') + parser.add_argument('--lambda_name', type=str, help="Name of the Lambda function") + + + args = parser.parse_args() + + access_key = args.access_key + secret_key = args.secret_key + input_bucket = args.input_bucket + output_bucket = args.output_bucket + lambda_name = args.lambda_name + region = 'us-east-1' + + aws_obj = aws_grader(access_key, secret_key, input_bucket, output_bucket, lambda_name,region) + aws_obj.main() diff --git a/Project-2/Part-1/src/handler.py b/Project-2/Part-1/src/handler.py new file mode 100644 index 0000000..d70385f --- /dev/null +++ b/Project-2/Part-1/src/handler.py @@ -0,0 +1,66 @@ +import os +import subprocess +import json +import urllib.parse +import boto3 + +print("Loading function") + + +# attach execution policies and IAM roles in deployment lambda +sesh = boto3.Session() + +s3 = sesh.client("s3", region_name="us-east-1") + + +def video_splitting_cmdline(video_filename): + filename = os.path.basename(video_filename) + outdir = os.path.splitext(filename)[0] + outdir = os.path.join("/tmp", outdir) + if not os.path.exists(outdir): + os.makedirs(outdir) + + split_cmd = ( + "ffmpeg -ss 0 -r 1 -i " + + video_filename + + " -vf fps=1/1 -start_number 0 -vframes 10 " + + outdir + + "/" + + "output-%02d.jpg -y" + ) + try: + subprocess.check_call(split_cmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.returncode) + print(e.output) + + return outdir + + +def handler(event, context): + for record in event["Records"]: + # get uploaded object + in_bucket = record["s3"]["bucket"]["name"] + if in_bucket != "1229569564-input": + continue + key = urllib.parse.unquote_plus(record["s3"]["object"]["key"], encoding="utf-8") + tmpkey = key.replace("/", "") + download_path = "/tmp/{}".format(tmpkey) + s3.download_file(in_bucket, key, download_path) + + # process it + out_dir = video_splitting_cmdline(download_path) + + # upload output objects + for frame in os.listdir(out_dir): + s3.upload_file( + os.path.join(out_dir, frame), + "1229569564-stage-1", + os.path.splitext(tmpkey)[0] + "/" + frame, + ) + + +if __name__ == "__main__": + with open("dummy_s3_trigger_event.json", "r") as dummy_event: + event = json.loads(dummy_event.read()) + handler(event, None) diff --git a/Project-2/Part-1/src/lambda_s3_policy.json b/Project-2/Part-1/src/lambda_s3_policy.json new file mode 100644 index 0000000..98512e6 --- /dev/null +++ b/Project-2/Part-1/src/lambda_s3_policy.json @@ -0,0 +1,28 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "logs:PutLogEvents", + "logs:CreateLogGroup", + "logs:CreateLogStream" + ], + "Resource": "arn:aws:logs:*:*:*" + }, + { + "Effect": "Allow", + "Action": [ + "s3:GetObject" + ], + "Resource": "arn:aws:s3:::*/*" + }, + { + "Effect": "Allow", + "Action": [ + "s3:PutObject" + ], + "Resource": "arn:aws:s3:::*/*" + } + ] +} \ No newline at end of file diff --git a/Project-2/Part-1/src/requirements.txt b/Project-2/Part-1/src/requirements.txt new file mode 100644 index 0000000..30ddf82 --- /dev/null +++ b/Project-2/Part-1/src/requirements.txt @@ -0,0 +1 @@ +boto3 diff --git a/Project-2/Part-1/src/workload_generator.py b/Project-2/Part-1/src/workload_generator.py new file mode 100644 index 0000000..d55cd2a --- /dev/null +++ b/Project-2/Part-1/src/workload_generator.py @@ -0,0 +1,73 @@ +#__copyright__ = "Copyright 2024, VISA Lab" +#__license__ = "MIT" + +from boto3 import client as boto3_client +import os +import argparse +import time + +input_bucket = "546proj2" +output_bucket = "546proj2output" +test_cases = "test_cases/" + +start_time = time.time() +parser = argparse.ArgumentParser(description='Upload videos to input S3') +# parser.add_argument('--num_request', type=int, help='one video per request') +parser.add_argument('--access_key', type=str, help='ACCCESS KEY of the grading IAM user') +parser.add_argument('--secret_key', type=str, help='SECRET KEY of the grading IAM user') +parser.add_argument('--input_bucket', type=str, help='Name of the input bucket, e.g. 1234567890-input') +parser.add_argument('--output_bucket', type=str, help='Name of the output bucket, e.g. 1234567890-stage-1') +parser.add_argument('--testcase_folder', type=str, help='the path of the folder where videos are saved, e.g. test_cases/test_case_1/') + +args = parser.parse_args() + +access_key = args.access_key +secret_key = args.secret_key +input_bucket = args.input_bucket +output_bucket = args.output_bucket +test_cases = args.testcase_folder +region = 'us-east-1' + +s3 = boto3_client('s3', aws_access_key_id = access_key, + aws_secret_access_key = secret_key, region_name=region) +def clear_input_bucket(input_bucket): + global s3 + list_obj = s3.list_objects_v2(Bucket=input_bucket) + try: + for item in list_obj["Contents"]: + key = item["Key"] + s3.delete_object(Bucket=input_bucket, Key=key) + except: + print("Nothing to clear in input bucket") + +def clear_output_bucket(output_bucket): + global s3 + list_obj = s3.list_objects_v2(Bucket=output_bucket) + try: + for item in list_obj["Contents"]: + key = item["Key"] + s3.delete_object(Bucket=output_bucket, Key=key) + except: + print("Nothing to clear in output bucket") + +def upload_to_input_bucket_s3(input_bucket, path, name): + global s3 + s3.upload_file(path + name, input_bucket, name) + +def upload_files(input_bucket, test_dir): + for filename in os.listdir(test_dir): + if filename.endswith(".mp4") or filename.endswith(".MP4"): + print("Uploading to input bucket.. name: " + str(filename)) + upload_to_input_bucket_s3(input_bucket, test_dir, filename) + + + +clear_input_bucket(input_bucket) +clear_input_bucket(output_bucket) + +upload_files(input_bucket,test_cases) + +end_time = time.time() +print("Time to run = ", end_time-start_time, "(seconds)") +print(f"Timestamps: start {start_time}, end {end_time}") +