mirror of
https://github.com/20kaushik02/CSE546_Cloud_Computing_Projects.git
synced 2026-01-25 06:44:04 +00:00
starting proj 2 part 2
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
### Grading Scripts
|
||||
|
||||
- How to run the script:
|
||||
- How to run the script for PART-1:
|
||||
```
|
||||
usage: grader_script_p1.py [-h] [--access_key ACCESS_KEY] [--secret_key SECRET_KEY] [--input_bucket INPUT_BUCKET] [--lambda_name LAMBDA_NAME]
|
||||
Grading Script
|
||||
@@ -18,9 +17,24 @@
|
||||
Name of the Lambda function
|
||||
|
||||
```
|
||||
|
||||
- How to run the script for PART-2:
|
||||
```
|
||||
usage: grader_script_p2_v2.py [-h] [--access_key ACCESS_KEY] [--secret_key SECRET_KEY] [--asu_id ASU_ID]
|
||||
Grading Script
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
--access_key ACCESS_KEY
|
||||
ACCCESS KEY ID of the grading IAM user
|
||||
--secret_key SECRET_KEY
|
||||
SECRET KEY of the grading IAM user
|
||||
--asu_id ASU_ID 10-digit ASU ID
|
||||
|
||||
```
|
||||
|
||||
**Note**: We will follow the naming conventions for S3 Bucket and Lambda function names as described in the project document to grade your submission
|
||||
|
||||
**Examples**:
|
||||
**Examples For PART-1**:
|
||||
|
||||
We will show examples for each option below.
|
||||
|
||||
|
||||
315
Tooling/Project_2/grading_scripts/grader_script_p2.py
Normal file
315
Tooling/Project_2/grading_scripts/grader_script_p2.py
Normal file
@@ -0,0 +1,315 @@
|
||||
__copyright__ = "Copyright 2024, VISA Lab"
|
||||
__license__ = "MIT"
|
||||
|
||||
import pdb
|
||||
import time
|
||||
import botocore
|
||||
import argparse
|
||||
import textwrap
|
||||
import boto3
|
||||
from boto3 import client as boto3_client
|
||||
from botocore.exceptions import ClientError
|
||||
from datetime import datetime,timezone,timedelta
|
||||
import re
|
||||
|
||||
class aws_grader():
|
||||
def __init__(self, access_key, secret_key, buckets, lambda_names, region):
|
||||
|
||||
self.access_key = access_key
|
||||
self.secret_key = secret_key
|
||||
self.region = region
|
||||
self.s3 = boto3_client('s3', aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key, region_name=region)
|
||||
self.cloudwatch = boto3_client('cloudwatch', aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key, region_name=region)
|
||||
self.iam_session = boto3.Session(aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key)
|
||||
self.s3_resources = self.iam_session.resource('s3', region)
|
||||
self.lambda_function = boto3_client('lambda', aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key, region_name=region)
|
||||
self.in_bucket_name = buckets[0]
|
||||
self.out_bucket_name = buckets[4]
|
||||
self.buckets = buckets
|
||||
self.lambda_names = lambda_names
|
||||
self.test_result = {}
|
||||
|
||||
def validate_lambda_exists_each(self, name, TC_num):
|
||||
try:
|
||||
response = self.lambda_function.get_function(
|
||||
FunctionName=name
|
||||
)
|
||||
print(f"Lambda function {name} HTTPStatusCode {response['ResponseMetadata']['HTTPStatusCode']}")
|
||||
self.test_result[TC_num] = "PASS"
|
||||
except self.lambda_function.exceptions.ResourceNotFoundException as e:
|
||||
print(f"Error {e}")
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}")
|
||||
def validate_lambda_exists(self, TC_num):
|
||||
self.validate_lambda_exists_each("video-splitting",TC_num+"_a")
|
||||
self.validate_lambda_exists_each("motion-detection", TC_num + "_b")
|
||||
self.validate_lambda_exists_each("face-extraction", TC_num + "_c")
|
||||
self.validate_lambda_exists_each("face-recognition", TC_num + "_d")
|
||||
|
||||
def validate_s3_subfolders_each(self, buckets, in_objects, TC_num):
|
||||
for num, bucket in enumerate(buckets[1:]):
|
||||
print(f"\nComparing buckets {buckets[0]} and {bucket} ...")
|
||||
TC_num_sub = TC_num+"_"+str(chr(97+num))
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
for obj in in_objects['Contents']:
|
||||
folder_name = obj['Key'].rsplit('.', 1)[0]
|
||||
out_objects = self.s3.list_objects_v2(Bucket=bucket, Prefix=folder_name, Delimiter='/')
|
||||
if out_objects['KeyCount'] == 1 or out_objects['KeyCount'] == 11:
|
||||
folder_name = out_objects['CommonPrefixes'][0]['Prefix'].rsplit("/")[0]
|
||||
prefix_name = out_objects['Prefix']
|
||||
if folder_name == prefix_name:
|
||||
print(f"{prefix_name} matches with {folder_name}")
|
||||
else:
|
||||
prefix_name = out_objects['Prefix']
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"NO folder named {prefix_name}\nExiting this test case... ")
|
||||
# print(f"DEBUG :: {out_objects}")
|
||||
break
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
|
||||
def validate_s3_subfolders(self, TC_num):
|
||||
in_objects = self.s3.list_objects_v2(Bucket=self.in_bucket_name)
|
||||
if in_objects['KeyCount']==0:
|
||||
print(f"Empty bucket {self.in_bucket_name}")
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}")
|
||||
return
|
||||
self.validate_s3_subfolders_each(buckets,in_objects,TC_num)
|
||||
|
||||
def check_non_empty_folders(self, bucket_num, TC_num):
|
||||
bucket = self.s3_resources.Bucket(self.buckets[bucket_num])
|
||||
TC_num_sub = TC_num + "_" + str(chr(96 + bucket_num))
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
try:
|
||||
objects = list(bucket.objects.all())
|
||||
print(f"{self.buckets[bucket_num]} contains {len(objects)} objects")
|
||||
if bucket_num==4:
|
||||
prefix_pattern = r"test_\d{2}/[oO]utput-\d{2}.txt"
|
||||
else:
|
||||
prefix_pattern = r"test_\d{2}/[oO]utput-\d{2}.(jpg|jpeg)"
|
||||
count = self.count_values_with_prefix(objects, prefix_pattern)
|
||||
# print(f"DEBUG :: Bucket {stage_1_bucket} has {count} objects that matches the pattern")
|
||||
if count >= 100:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
except ClientError:
|
||||
print(f"Couldn't get objects for bucket {bucket.name}")
|
||||
raise
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}\n")
|
||||
|
||||
def count_values_with_prefix(self,objects, prefix_pattern):
|
||||
count = 0
|
||||
for o in objects:
|
||||
if re.match(prefix_pattern, o.key):
|
||||
# print(f"DEBUG :: Object key '{o.key}' follows the pattern '{prefix_pattern}'")
|
||||
count += 1
|
||||
else:
|
||||
print(f"Object key '{o.key}' does NOT follows the pattern '{prefix_pattern}'")
|
||||
|
||||
return count
|
||||
def validate_s3_output_objects(self, TC_num):
|
||||
in_bucket = self.s3_resources.Bucket(self.buckets[0])
|
||||
try:
|
||||
in_objects = list(in_bucket.objects.all())
|
||||
print(f"{self.buckets[0]} contains {len(in_objects)} objects")
|
||||
|
||||
# Check if all the folders of stage-1, stage-2, stage-3, and output bucket are non-empty
|
||||
self.check_non_empty_folders(1,TC_num=TC_num)
|
||||
self.check_non_empty_folders(2,TC_num=TC_num)
|
||||
self.check_non_empty_folders(3,TC_num=TC_num)
|
||||
self.check_non_empty_folders(4,TC_num=TC_num)
|
||||
|
||||
except ClientError:
|
||||
print(f"Couldn't get objects for bucket {in_bucket.name}")
|
||||
raise
|
||||
else:
|
||||
return
|
||||
|
||||
# You have to make sure to run the workload generator and it executes within 15 mins
|
||||
# of polling for cloudwatch metrics.
|
||||
def check_lambda_duration(self, TC_num):
|
||||
response = self.cloudwatch.get_metric_data(
|
||||
MetricDataQueries=[
|
||||
{
|
||||
'Id': 'testDuration',
|
||||
'MetricStat': {
|
||||
'Metric': {
|
||||
'Namespace': 'AWS/Lambda',
|
||||
'MetricName': 'Duration'
|
||||
},
|
||||
'Period': 600,
|
||||
'Stat': 'Average'
|
||||
},
|
||||
'ReturnData': True,
|
||||
},
|
||||
],
|
||||
StartTime=datetime.now().utcnow() - timedelta(minutes=15),
|
||||
EndTime=datetime.now().utcnow(),
|
||||
ScanBy='TimestampAscending'
|
||||
)
|
||||
print(response['MetricDataResults'][0]['Values'])
|
||||
values = response['MetricDataResults'][0]['Values']
|
||||
if not values:
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}")
|
||||
return
|
||||
if max(values) > 10000:
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
else:
|
||||
self.test_result[TC_num] = "PASS"
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}")
|
||||
|
||||
def check_lambda_concurrency_each(self,functionName, TC_num,subcase):
|
||||
TC_num_sub = TC_num + "_" + str(chr(96 + subcase))
|
||||
|
||||
response = self.cloudwatch.get_metric_data(
|
||||
MetricDataQueries=[
|
||||
{
|
||||
'Id': 'testConcurrency',
|
||||
'MetricStat': {
|
||||
'Metric': {
|
||||
'Namespace': 'AWS/Lambda',
|
||||
'MetricName': 'ConcurrentExecutions',
|
||||
'FunctionName': functionName
|
||||
},
|
||||
'Period': 600,
|
||||
'Stat': 'Maximum'
|
||||
},
|
||||
'ReturnData': True,
|
||||
},
|
||||
],
|
||||
StartTime=datetime.now().utcnow() - timedelta(minutes=15),
|
||||
EndTime=datetime.now().utcnow(),
|
||||
ScanBy='TimestampAscending'
|
||||
)
|
||||
print(response['MetricDataResults'][0]['Values'])
|
||||
values = response['MetricDataResults'][0]['Values']
|
||||
if not values:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
return
|
||||
if max(values) < 5:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
else:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
|
||||
def check_lambda_concurrency(self,TC_num):
|
||||
self.check_lambda_concurrency_each('video-splitting', TC_num,1)
|
||||
self.check_lambda_concurrency_each('motion-detection', TC_num,2)
|
||||
self.check_lambda_concurrency_each('face-extraction', TC_num,3)
|
||||
self.check_lambda_concurrency_each('face-recognition', TC_num,4)
|
||||
|
||||
def check_bucket_exist(self, bucket):
|
||||
if not bucket:
|
||||
print(f"Bucket name is empty!")
|
||||
return False
|
||||
try:
|
||||
self.s3.head_bucket(Bucket=bucket)
|
||||
print(f"Bucket {bucket} Exists!")
|
||||
return True
|
||||
except botocore.exceptions.ClientError as e:
|
||||
# If a client error is thrown, then check that it was a 404 error.
|
||||
# If it was a 404 error, then the bucket does not exist.
|
||||
error_code = int(e.response['Error']['Code'])
|
||||
if error_code == 403:
|
||||
print("Private Bucket. Forbidden Access!")
|
||||
return True
|
||||
elif error_code == 404:
|
||||
print(f"Bucket {bucket} does Not Exist!")
|
||||
return False
|
||||
def empty_s3_bucket(self, bucket_name):
|
||||
bucket = self.s3_resources.Bucket(bucket_name)
|
||||
bucket.objects.all().delete()
|
||||
print(f"{bucket_name} S3 Bucket is now EMPTY !!")
|
||||
|
||||
def count_bucket_objects(self, bucket_name):
|
||||
bucket = self.s3_resources.Bucket(bucket_name)
|
||||
count = 0
|
||||
for index in bucket.objects.all():
|
||||
count += 1
|
||||
#print(f"{bucket_name} S3 Bucket has {count} objects !!")
|
||||
return count
|
||||
|
||||
def validate_s3_buckets_initial_each(self, bucket_num, TC_num):
|
||||
TC_num_sub = TC_num + "_" + str(chr(97 + bucket_num))
|
||||
isExist = self.check_bucket_exist(self.buckets[bucket_num])
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
if isExist:
|
||||
obj_count = self.count_bucket_objects(self.buckets[bucket_num])
|
||||
print(f"S3 Bucket:{self.buckets[bucket_num]} has {obj_count} object(s)")
|
||||
if obj_count == 0:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
else:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}\n")
|
||||
|
||||
def validate_s3_buckets_initial(self, TC_num):
|
||||
print(" - Run this BEFORE the workload generator client starts. Press Ctrl^C to exit.")
|
||||
print(" - WARN: If there are objects in the S3 buckets; they will be deleted")
|
||||
print(" ---------------------------------------------------------")
|
||||
|
||||
for i in range(len(self.buckets)):
|
||||
self.validate_s3_buckets_initial_each(bucket_num=i, TC_num=TC_num)
|
||||
|
||||
def display_menu(self):
|
||||
print("\n")
|
||||
print("=============================================================================")
|
||||
print("======== Welcome to CSE546 Cloud Computing AWS Console ======================")
|
||||
print("=============================================================================")
|
||||
print(f"IAM ACCESS KEY ID: {self.access_key}")
|
||||
print(f"IAM SECRET ACCESS KEY: {self.secret_key}")
|
||||
print("=============================================================================")
|
||||
print("1 - Validate all Lambda functions")
|
||||
print("2 - Validate S3 Buckets names and initial states")
|
||||
print("3 - Validate S3 buckets subfolders")
|
||||
print("4 - Validate S3 buckets objects")
|
||||
print("5 - Check lambda average duration")
|
||||
print("6 - Check lambda concurrency")
|
||||
print("0 - Exit")
|
||||
print("Enter a choice:")
|
||||
choice = input()
|
||||
return choice
|
||||
|
||||
def main(self):
|
||||
while(1):
|
||||
choice = self.display_menu()
|
||||
if int(choice) == 1:
|
||||
self.validate_lambda_exists('Test_1')
|
||||
elif int(choice) == 2:
|
||||
self.validate_s3_buckets_initial('Test_2')
|
||||
elif int(choice) == 3:
|
||||
self.validate_s3_subfolders('Test_3')
|
||||
elif int(choice) == 4:
|
||||
self.validate_s3_output_objects('Test_4')
|
||||
elif int(choice) == 5:
|
||||
self.check_lambda_duration('Test_5')
|
||||
elif int(choice) == 6:
|
||||
self.check_lambda_concurrency('Test_6')
|
||||
elif int(choice) == 0:
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Grading Script')
|
||||
parser.add_argument('--access_key', type=str, help='ACCCESS KEY ID of the grading IAM user')
|
||||
parser.add_argument('--secret_key', type=str, help='SECRET KEY of the grading IAM user')
|
||||
parser.add_argument('--asu_id', type=str, help='10-digit ASU ID')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
access_key = args.access_key
|
||||
secret_key = args.secret_key
|
||||
asu_id = args.asu_id
|
||||
input_bucket = asu_id+"-input"
|
||||
output_bucket = asu_id+"-output"
|
||||
stage_1_bucket = asu_id+"-stage-1"
|
||||
stage_2_bucket = asu_id + "-stage-2"
|
||||
stage_3_bucket = asu_id + "-stage-3"
|
||||
buckets = [input_bucket, stage_1_bucket, stage_2_bucket, stage_3_bucket, output_bucket]
|
||||
lambda_names = ["video-splitting", "motion-detection","face-extraction", "face-recognition"]
|
||||
region = 'us-east-1'
|
||||
|
||||
aws_obj = aws_grader(access_key, secret_key, buckets, lambda_names,region)
|
||||
aws_obj.main()
|
||||
439
Tooling/Project_2/grading_scripts/grader_script_p2_v2.py
Normal file
439
Tooling/Project_2/grading_scripts/grader_script_p2_v2.py
Normal file
@@ -0,0 +1,439 @@
|
||||
__copyright__ = "Copyright 2024, VISA Lab"
|
||||
__license__ = "MIT"
|
||||
|
||||
import pdb
|
||||
import time
|
||||
import botocore
|
||||
import argparse
|
||||
import textwrap
|
||||
import boto3
|
||||
from boto3 import client as boto3_client
|
||||
from botocore.exceptions import ClientError
|
||||
from datetime import datetime,timezone,timedelta
|
||||
import re
|
||||
import os
|
||||
import shutil
|
||||
|
||||
class aws_grader():
|
||||
def __init__(self, access_key, secret_key, buckets, lambda_names, region, asu_id):
|
||||
|
||||
self.access_key = access_key
|
||||
self.secret_key = secret_key
|
||||
self.region = region
|
||||
self.s3 = boto3_client('s3', aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key, region_name=region)
|
||||
self.cloudwatch = boto3_client('cloudwatch', aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key, region_name=region)
|
||||
self.iam_session = boto3.Session(aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key)
|
||||
self.s3_resources = self.iam_session.resource('s3', region)
|
||||
self.lambda_function = boto3_client('lambda', aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key, region_name=region)
|
||||
self.in_bucket_name = buckets[0]
|
||||
self.out_bucket_name = buckets[2]
|
||||
self.buckets = buckets
|
||||
self.lambda_names = lambda_names
|
||||
self.test_result = {}
|
||||
self.end_to_end_latency = 0
|
||||
self.output_folder = f"outputs_{asu_id}"
|
||||
self.match = ["Trump", "Biden", "Bean", "Depp", "Diesel", "Floki", "Freeman", "Obama"]
|
||||
self.total_points = 0
|
||||
|
||||
def validate_lambda_exists_each(self, name, TC_num, num):
|
||||
TC_num_sub = TC_num + "_" + str(chr(97 + num))
|
||||
|
||||
try:
|
||||
response = self.lambda_function.get_function(
|
||||
FunctionName=name
|
||||
)
|
||||
print(f"Lambda function {name} HTTPStatusCode {response['ResponseMetadata']['HTTPStatusCode']}")
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
except self.lambda_function.exceptions.ResourceNotFoundException as e:
|
||||
print(f"Error {e}")
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
def validate_lambda_exists(self, TC_num):
|
||||
self.validate_lambda_exists_each("video-splitting",TC_num, num=0)
|
||||
self.validate_lambda_exists_each("face-recognition", TC_num, num=1)
|
||||
if self.test_result[TC_num + "_" + str(chr(97 + 0))] == "FAIL" or self.test_result[TC_num + "_" + str(chr(97 + 1))] == "FAIL":
|
||||
self.total_points -= 10
|
||||
|
||||
def validate_s3_subfolders_each(self, buckets, in_objects, TC_num):
|
||||
for num, bucket in enumerate(buckets[1:]):
|
||||
print(f"\nComparing buckets {buckets[0]} and {bucket} ...")
|
||||
TC_num_sub = TC_num+"_"+str(chr(97+num))
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
for obj in in_objects['Contents']:
|
||||
folder_name = obj['Key'].rsplit('.', 1)[0]
|
||||
out_objects = self.s3.list_objects_v2(Bucket=bucket, Prefix=folder_name, Delimiter='/')
|
||||
if out_objects['KeyCount'] == 1 or out_objects['KeyCount'] == 11:
|
||||
folder_name = out_objects['CommonPrefixes'][0]['Prefix'].rsplit("/")[0]
|
||||
prefix_name = out_objects['Prefix']
|
||||
if folder_name == prefix_name:
|
||||
print(f"{prefix_name} matches with {folder_name}")
|
||||
else:
|
||||
prefix_name = out_objects['Prefix']
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"NO folder named {prefix_name}\nExiting this test case... ")
|
||||
# print(f"DEBUG :: {out_objects}")
|
||||
break
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
|
||||
def validate_s3_subfolders(self, TC_num):
|
||||
in_objects = self.s3.list_objects_v2(Bucket=self.in_bucket_name)
|
||||
if in_objects['KeyCount']==0:
|
||||
print(f"Empty bucket {self.in_bucket_name}")
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}")
|
||||
return
|
||||
self.validate_s3_subfolders_each(buckets,in_objects,TC_num)
|
||||
|
||||
def check_non_empty_folders(self, bucket_num, TC_num):
|
||||
bucket = self.s3_resources.Bucket(self.buckets[bucket_num])
|
||||
TC_num_sub = TC_num + "_" + str(chr(96 + bucket_num))
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
try:
|
||||
objects = list(bucket.objects.all())
|
||||
print(f"{self.buckets[bucket_num]} contains {len(objects)} objects")
|
||||
if bucket_num==4:
|
||||
prefix_pattern = r"test_\d{2}/[oO]utput-\d{2}.txt"
|
||||
else:
|
||||
prefix_pattern = r"test_\d{2}/[oO]utput-\d{2}.(jpg|jpeg)"
|
||||
count = self.count_values_with_prefix(objects, prefix_pattern)
|
||||
# print(f"DEBUG :: Bucket {stage_1_bucket} has {count} objects that matches the pattern")
|
||||
if count >= 100:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
except ClientError:
|
||||
print(f"Couldn't get objects for bucket {bucket.name}")
|
||||
raise
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}\n")
|
||||
|
||||
def count_values_with_prefix(self,objects, prefix_pattern):
|
||||
count = 0
|
||||
for o in objects:
|
||||
if re.match(prefix_pattern, o.key):
|
||||
# print(f"DEBUG :: Object key '{o.key}' follows the pattern '{prefix_pattern}'")
|
||||
count += 1
|
||||
else:
|
||||
print(f"Object key '{o.key}' does NOT follows the pattern '{prefix_pattern}'")
|
||||
return count
|
||||
|
||||
def validate_bucket_objects(self, TC_num, bucket_num):
|
||||
bucket = self.buckets[bucket_num]
|
||||
bucket_res = self.s3_resources.Bucket(self.buckets[bucket_num])
|
||||
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
try:
|
||||
objects = list(bucket_res.objects.all())
|
||||
print(f"{self.buckets[bucket_num]} contains {len(objects)} objects")
|
||||
if bucket_num == 2:
|
||||
prefix_pattern = r"test_\d{2}.txt"
|
||||
else:
|
||||
prefix_pattern = r"test_\d{2}.(jpg|jpeg)"
|
||||
count = self.count_values_with_prefix(objects, prefix_pattern)
|
||||
# print(f"DEBUG :: Bucket {stage_1_bucket} has {count} objects that matches the pattern")
|
||||
if count >= 100:
|
||||
self.test_result[TC_num] = "PASS"
|
||||
self.total_points += 20
|
||||
else:
|
||||
missing = 100 - count
|
||||
self.total_points = self.total_points + 20 - min(missing, 20)
|
||||
except ClientError:
|
||||
print(f"Couldn't get objects for bucket {bucket.name}")
|
||||
raise
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}\n")
|
||||
|
||||
def validate_s3_output_objects(self, TC_num):
|
||||
in_bucket = self.s3_resources.Bucket(self.buckets[0])
|
||||
try:
|
||||
in_objects = list(in_bucket.objects.all())
|
||||
print(f"{self.buckets[0]} contains {len(in_objects)} objects")
|
||||
|
||||
# Check if all the folders of stage-1, stage-2, stage-3, and output bucket are non-empty
|
||||
self.check_non_empty_folders(1,TC_num=TC_num)
|
||||
self.check_non_empty_folders(2,TC_num=TC_num)
|
||||
self.check_non_empty_folders(3,TC_num=TC_num)
|
||||
self.check_non_empty_folders(4,TC_num=TC_num)
|
||||
|
||||
except ClientError:
|
||||
print(f"Couldn't get objects for bucket {in_bucket.name}")
|
||||
raise
|
||||
else:
|
||||
return
|
||||
|
||||
# You have to make sure to run the workload generator and it executes within 15 mins
|
||||
# of polling for cloudwatch metrics.
|
||||
def check_lambda_duration_each(self, functionName, TC_num, subcase, threshold):
|
||||
TC_num_sub = TC_num + "_" + str(chr(96 + subcase))
|
||||
|
||||
response = self.cloudwatch.get_metric_data(
|
||||
MetricDataQueries=[
|
||||
{
|
||||
'Id': 'testDuration',
|
||||
'MetricStat': {
|
||||
'Metric': {
|
||||
'Namespace': 'AWS/Lambda',
|
||||
'MetricName': 'Duration',
|
||||
"Dimensions": [
|
||||
{
|
||||
"Name": "FunctionName",
|
||||
"Value": functionName
|
||||
}
|
||||
]
|
||||
},
|
||||
'Period': 600,
|
||||
'Stat': 'Average'
|
||||
},
|
||||
'ReturnData': True,
|
||||
},
|
||||
],
|
||||
StartTime=datetime.now().utcnow() - timedelta(minutes=15),
|
||||
EndTime=datetime.now().utcnow(),
|
||||
ScanBy='TimestampAscending'
|
||||
)
|
||||
print(response['MetricDataResults'][0]['Values'])
|
||||
values = response['MetricDataResults'][0]['Values']
|
||||
if not values:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
return
|
||||
if max(values) > threshold:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
else:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
|
||||
def check_lambda_duration(self, TC_num):
|
||||
self.check_lambda_duration_each('video-splitting', TC_num, 1, threshold=2000)
|
||||
self.check_lambda_duration_each('face-recognition', TC_num, 2, threshold=10000)
|
||||
|
||||
def check_lambda_concurrency_each(self,functionName, TC_num,subcase, threshold):
|
||||
TC_num_sub = TC_num + "_" + str(chr(96 + subcase))
|
||||
|
||||
response = self.cloudwatch.get_metric_data(
|
||||
MetricDataQueries=[
|
||||
{
|
||||
'Id': 'testConcurrency',
|
||||
'MetricStat': {
|
||||
'Metric': {
|
||||
'Namespace': 'AWS/Lambda',
|
||||
'MetricName': 'ConcurrentExecutions',
|
||||
"Dimensions": [
|
||||
{
|
||||
"Name": "FunctionName",
|
||||
"Value": functionName
|
||||
}
|
||||
]
|
||||
},
|
||||
'Period': 600,
|
||||
'Stat': 'Maximum'
|
||||
},
|
||||
'ReturnData': True,
|
||||
},
|
||||
],
|
||||
StartTime=datetime.now().utcnow() - timedelta(minutes=15),
|
||||
EndTime=datetime.now().utcnow(),
|
||||
ScanBy='TimestampAscending'
|
||||
)
|
||||
print(response['MetricDataResults'][0]['Values'])
|
||||
values = response['MetricDataResults'][0]['Values']
|
||||
if not values:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
return
|
||||
if max(values) < threshold:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
else:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}")
|
||||
|
||||
def check_lambda_concurrency(self,TC_num):
|
||||
self.check_lambda_concurrency_each('video-splitting', TC_num,1, threshold=3)
|
||||
self.check_lambda_concurrency_each('face-recognition', TC_num,2, threshold=3)
|
||||
|
||||
def check_bucket_exist(self, bucket):
|
||||
if not bucket:
|
||||
print(f"Bucket name is empty!")
|
||||
return False
|
||||
try:
|
||||
self.s3.head_bucket(Bucket=bucket)
|
||||
print(f"Bucket {bucket} Exists!")
|
||||
return True
|
||||
except botocore.exceptions.ClientError as e:
|
||||
# If a client error is thrown, then check that it was a 404 error.
|
||||
# If it was a 404 error, then the bucket does not exist.
|
||||
error_code = int(e.response['Error']['Code'])
|
||||
if error_code == 403:
|
||||
print("Private Bucket. Forbidden Access!")
|
||||
return True
|
||||
elif error_code == 404:
|
||||
print(f"Bucket {bucket} does Not Exist!")
|
||||
return False
|
||||
def empty_s3_bucket(self, bucket_name):
|
||||
bucket = self.s3_resources.Bucket(bucket_name)
|
||||
bucket.objects.all().delete()
|
||||
print(f"{bucket_name} S3 Bucket is now EMPTY !!")
|
||||
|
||||
def count_bucket_objects(self, bucket_name):
|
||||
bucket = self.s3_resources.Bucket(bucket_name)
|
||||
count = 0
|
||||
for index in bucket.objects.all():
|
||||
count += 1
|
||||
#print(f"{bucket_name} S3 Bucket has {count} objects !!")
|
||||
return count
|
||||
|
||||
def validate_s3_buckets_initial_each(self, bucket_num, TC_num):
|
||||
TC_num_sub = TC_num + "_" + str(chr(97 + bucket_num))
|
||||
isExist = self.check_bucket_exist(self.buckets[bucket_num])
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
if isExist:
|
||||
obj_count = self.count_bucket_objects(self.buckets[bucket_num])
|
||||
print(f"S3 Bucket:{self.buckets[bucket_num]} has {obj_count} object(s)")
|
||||
if obj_count == 0:
|
||||
self.test_result[TC_num_sub] = "PASS"
|
||||
else:
|
||||
self.test_result[TC_num_sub] = "FAIL"
|
||||
print(f"Test status of {TC_num_sub} : {self.test_result[TC_num_sub]}\n")
|
||||
|
||||
def validate_s3_buckets_initial(self, TC_num):
|
||||
print(" - Run this BEFORE the workload generator client starts. Press Ctrl^C to exit.")
|
||||
print(" - WARN: If there are objects in the S3 buckets; they will be deleted")
|
||||
print(" ---------------------------------------------------------")
|
||||
|
||||
for i in range(len(self.buckets)):
|
||||
self.validate_s3_buckets_initial_each(bucket_num=i, TC_num=TC_num)
|
||||
if self.test_result[TC_num + "_" + str(chr(97 + 0))] == "FAIL" or \
|
||||
self.test_result[TC_num + "_" + str(chr(97 + 1))] == "FAIL" or \
|
||||
self.test_result[TC_num + "_" + str(chr(97 + 2))] == "FAIL":
|
||||
self.total_points -= 10
|
||||
|
||||
|
||||
def download_from_s3(self, bucket, folder_name):
|
||||
objects = self.s3.list_objects_v2(Bucket=bucket, Prefix=folder_name)
|
||||
for obj in objects.get('Contents', []):
|
||||
# Get key (filename) of the object
|
||||
key = obj['Key']
|
||||
self.s3.download_file(bucket, key, f"/tmp/{key}")
|
||||
|
||||
def check_end_to_end(self, TC_num):
|
||||
print(" - Make sure workload generator is started. Press Ctrl^C to exit.")
|
||||
print(" ---------------------------------------------------------")
|
||||
print("Proceed? (y/n)")
|
||||
proceed = input()
|
||||
if proceed == "y":
|
||||
start_time = time.time()
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
|
||||
out_bucket = self.s3_resources.Bucket(self.buckets[2])
|
||||
while len(list(out_bucket.objects.all())) <= 100 or (time.time()-start_time)<=400:
|
||||
objects = out_bucket.objects.all()
|
||||
print(f"Total objects in output bucket: {len(list(objects))}, current time elapsed: {time.time()-start_time}")
|
||||
if len(list(objects)) == 100:
|
||||
self.end_to_end_latency = time.time() - start_time
|
||||
print(f"End-to-end latency is: {self.end_to_end_latency}")
|
||||
break
|
||||
if(time.time() - start_time) < 400:
|
||||
self.test_result[TC_num] = "PASS"
|
||||
|
||||
if self.end_to_end_latency <= 300:
|
||||
self.total_points += 30
|
||||
elif 300 < self.end_to_end_latency <=400:
|
||||
self.total_points += 20
|
||||
elif self.end_to_end_latency > 400:
|
||||
objects = out_bucket.objects.all()
|
||||
missing = 100 - len(list(objects))
|
||||
self.total_points = self.total_points + 20 - min(missing, 20)
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}\n")
|
||||
|
||||
|
||||
def check_correctness(self, TC_num):
|
||||
if os.path.exists(self.output_folder):
|
||||
shutil.rmtree(self.output_folder)
|
||||
if not os.path.exists(self.output_folder):
|
||||
os.makedirs(self.output_folder)
|
||||
out_bucket = self.s3_resources.Bucket(self.buckets[2])
|
||||
objects = out_bucket.objects.all()
|
||||
print(f"Downloading {len(list(objects))} objects from the {self.buckets[2]} bucket ...")
|
||||
for o in objects:
|
||||
# print(self.output_folder,out_bucket,o.key)
|
||||
self.s3.download_file(self.buckets[2], o.key, os.path.join(self.output_folder,o.key))
|
||||
match_count = 0
|
||||
for i,filename in enumerate(sorted(os.listdir(self.output_folder))):
|
||||
with open(os.path.join(self.output_folder, filename),"r") as f:
|
||||
line = f.read()
|
||||
prefix_pattern = r"test_\d{2}.txt"
|
||||
if not re.match(prefix_pattern, filename):
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
print(f"Filename does not follow the required pattern {prefix_pattern}. Cannnot check correctness test.")
|
||||
return
|
||||
filenum = int(filename.split("_")[1].split(".")[0]) % len(self.match)
|
||||
if line.strip() == self.match[filenum]:
|
||||
match_count += 1
|
||||
else:
|
||||
print(f"{filename} content {line.strip()} did not match with {self.match[i % len(self.match)]}")
|
||||
missing = 100 - match_count
|
||||
print(f"Missing correct results = {missing}")
|
||||
if missing == 0:
|
||||
self.test_result[TC_num] = "PASS"
|
||||
else:
|
||||
self.test_result[TC_num] = "FAIL"
|
||||
print(f"Test status of {TC_num} : {self.test_result[TC_num]}\n")
|
||||
self.total_points = self.total_points + 30 - min(missing, 30)
|
||||
|
||||
def display_menu(self):
|
||||
print("\n")
|
||||
print("=============================================================================")
|
||||
print("======== Welcome to CSE546 Cloud Computing AWS Console ======================")
|
||||
print("=============================================================================")
|
||||
print(f"IAM ACCESS KEY ID: {self.access_key}")
|
||||
print(f"IAM SECRET ACCESS KEY: {self.secret_key}")
|
||||
print("=============================================================================")
|
||||
print("1 - Validate all Lambda functions")
|
||||
print("2 - Validate S3 Buckets names and initial states")
|
||||
print("3 - End-to-end pipeline testing")
|
||||
print("4 - Validate Stage-1 bucket objects")
|
||||
print("5 - Validate Output bucket objects")
|
||||
print("6 - Check the correctness of the results")
|
||||
print("0 - Exit")
|
||||
print("Enter a choice:")
|
||||
choice = input()
|
||||
return choice
|
||||
|
||||
def main(self):
|
||||
while(1):
|
||||
choice = self.display_menu()
|
||||
if int(choice) == 1:
|
||||
self.validate_lambda_exists('Test_1')
|
||||
elif int(choice) == 2:
|
||||
self.validate_s3_buckets_initial('Test_2')
|
||||
elif int(choice) == 3:
|
||||
self.check_end_to_end('Test_3')
|
||||
elif int(choice) == 4:
|
||||
self.validate_bucket_objects('Test_4',bucket_num=1)
|
||||
elif int(choice) == 5:
|
||||
self.validate_bucket_objects('Test_5',bucket_num=2)
|
||||
elif int(choice) == 6:
|
||||
self.check_correctness('Test_6')
|
||||
elif int(choice) == 0:
|
||||
break
|
||||
print(f"Total points : {self.total_points}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Grading Script')
|
||||
parser.add_argument('--access_key', type=str, help='ACCCESS KEY ID of the grading IAM user')
|
||||
parser.add_argument('--secret_key', type=str, help='SECRET KEY of the grading IAM user')
|
||||
parser.add_argument('--asu_id', type=str, help='10-digit ASU ID')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
access_key = args.access_key
|
||||
secret_key = args.secret_key
|
||||
asu_id = args.asu_id
|
||||
input_bucket = asu_id+"-input"
|
||||
output_bucket = asu_id+"-output"
|
||||
stage_1_bucket = asu_id+"-stage-1"
|
||||
buckets = [input_bucket, stage_1_bucket, output_bucket]
|
||||
lambda_names = ["video-splitting", "face-recognition"]
|
||||
region = 'us-east-1'
|
||||
|
||||
aws_obj = aws_grader(access_key, secret_key, buckets, lambda_names,region, asu_id)
|
||||
aws_obj.main()
|
||||
Reference in New Issue
Block a user