🚀 Project 7: Serverless Event Pipeline - Code Browser
📋 Infrastructure as Code
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: 'Project 7: Serverless Event-Driven Image Intake Pipeline'
Parameters:
ProjectName:
Type: String
Default: 'project-7-serverless-event'
UploadBucketName:
Type: String
Default: 'project-7-serverless-event-upload'
ProcessedBucketName:
Type: String
Default: 'project-7-serverless-event-processed'
DynamoTableName:
Type: String
Default: 'project-7-image-metadata'
EmailForNotifications:
Type: String
Default: 'rusty.teston@gmail.com'
Globals:
Function:
Timeout: 30
Runtime: python3.11
Environment:
Variables:
DYNAMODB_TABLE: !Ref ImageMetadataTable
SNS_TOPIC: !Ref NotificationTopic
PROCESSED_BUCKET: !Ref ProcessedBucket
Resources:
# S3 Buckets
UploadBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref UploadBucketName
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
ProcessedBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref ProcessedBucketName
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
# Lambda Function
ImageIngestFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub '${ProjectName}-image-ingest'
InlineCode: |
import json
import boto3
import os
import uuid
from datetime import datetime
from urllib.parse import unquote_plus
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
s3 = boto3.client('s3')
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
SNS_TOPIC = os.environ['SNS_TOPIC']
PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
def lambda_handler(event, context):
try:
table = dynamodb.Table(DYNAMODB_TABLE)
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
object_key = unquote_plus(record['s3']['object']['key'])
object_size = record['s3']['object']['size']
event_time = record['eventTime']
image_id = str(uuid.uuid4())
try:
s3_response = s3.head_object(Bucket=bucket_name, Key=object_key)
content_type = s3_response.get('ContentType', 'unknown')
except:
content_type = 'unknown'
metadata_record = {
'image_id': image_id,
'bucket_name': bucket_name,
'object_key': object_key,
'file_size': object_size,
'content_type': content_type,
'upload_time': event_time,
'processing_status': 'ingested',
'created_at': datetime.now().isoformat()
}
table.put_item(Item=metadata_record)
message = {
'image_id': image_id,
'object_key': object_key,
'bucket_name': bucket_name,
'file_size': object_size,
'status': 'processed'
}
sns.publish(
TopicArn=SNS_TOPIC,
Subject=f'Image Processed: {object_key}',
Message=json.dumps(message, indent=2)
)
return {
'statusCode': 200,
'body': json.dumps({'message': f'Successfully processed {len(event["Records"])} image(s)'})
}
except Exception as e:
print(f'Error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Handler: index.lambda_handler
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ImageMetadataTable
- SNSPublishMessagePolicy:
TopicName: !GetAtt NotificationTopic.TopicName
- S3ReadPolicy:
BucketName: !Ref UploadBucketName
# DynamoDB Table
ImageMetadataTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref DynamoTableName
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: image_id
AttributeType: S
KeySchema:
- AttributeName: image_id
KeyType: HASH
SSESpecification:
SSEEnabled: true
# SNS Topic
NotificationTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub '${ProjectName}-notifications'
KmsMasterKeyId: alias/aws/sns
# SNS Subscription
EmailSubscription:
Type: AWS::SNS::Subscription
Properties:
Protocol: email
TopicArn: !Ref NotificationTopic
Endpoint: !Ref EmailForNotifications
Outputs:
UploadBucketName:
Description: 'S3 Bucket for uploading images'
Value: !Ref UploadBucket
LambdaFunctionName:
Description: 'Lambda function for image processing'
Value: !Ref ImageIngestFunction
import json
import boto3
import os
import uuid
from datetime import datetime
from urllib.parse import unquote_plus
# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
s3 = boto3.client('s3')
# Environment variables
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
SNS_TOPIC = os.environ['SNS_TOPIC']
PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
def lambda_handler(event, context):
"""
Lambda function to process image uploads from S3
Extracts metadata and stores in DynamoDB, sends SNS notification
"""
try:
# Get DynamoDB table
table = dynamodb.Table(DYNAMODB_TABLE)
# Process each record in the event
for record in event['Records']:
# Extract S3 event information
bucket_name = record['s3']['bucket']['name']
object_key = unquote_plus(record['s3']['object']['key'])
object_size = record['s3']['object']['size']
event_time = record['eventTime']
print(f"Processing image: {object_key} from bucket: {bucket_name}")
# Generate unique image ID
image_id = str(uuid.uuid4())
# Get additional metadata from S3 object
try:
s3_response = s3.head_object(Bucket=bucket_name, Key=object_key)
content_type = s3_response.get('ContentType', 'unknown')
last_modified = s3_response.get('LastModified', datetime.now()).isoformat()
except Exception as e:
print(f"Error getting S3 metadata: {str(e)}")
content_type = 'unknown'
last_modified = datetime.now().isoformat()
# Prepare metadata record
metadata_record = {
'image_id': image_id,
'bucket_name': bucket_name,
'object_key': object_key,
'file_size': object_size,
'content_type': content_type,
'upload_time': event_time,
'last_modified': last_modified,
'processing_status': 'ingested',
'created_at': datetime.now().isoformat()
}
# Store metadata in DynamoDB
try:
table.put_item(Item=metadata_record)
print(f"Metadata stored for image: {image_id}")
except Exception as e:
print(f"Error storing metadata: {str(e)}")
raise e
# Send SNS notification
try:
message = {
'image_id': image_id,
'object_key': object_key,
'bucket_name': bucket_name,
'file_size': object_size,
'content_type': content_type,
'status': 'processed'
}
sns.publish(
TopicArn=SNS_TOPIC,
Subject=f'Image Processed: {object_key}',
Message=json.dumps(message, indent=2)
)
print(f"SNS notification sent for image: {image_id}")
except Exception as e:
print(f"Error sending SNS notification: {str(e)}")
# Don't raise here - metadata is already stored
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Successfully processed {len(event["Records"])} image(s)',
'processed_images': len(event['Records'])
})
}
except Exception as e:
print(f"Error processing images: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e),
'message': 'Failed to process images'
})
}
{
"LambdaFunctionConfigurations": [
{
"Id": "ImageUploadTrigger",
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:901779867920:function:project-7-serverless-event-image-ingest",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "suffix",
"Value": ".jpg"
}
]
}
}
},
{
"Id": "ImageUploadTriggerPNG",
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:901779867920:function:project-7-serverless-event-image-ingest",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "suffix",
"Value": ".png"
}
]
}
}
}
]
}
🚀 Deployment Commands
#!/bin/bash
# Deploy infrastructure
aws cloudformation deploy \
--template-file simple-template.yaml \
--stack-name project-7-serverless-event \
--capabilities CAPABILITY_IAM \
--region us-east-1
# Deploy Lambda function
aws cloudformation deploy \
--template-file lambda-only-template.yaml \
--stack-name project-7-lambda-function \
--capabilities CAPABILITY_NAMED_IAM \
--region us-east-1
# Configure S3 notifications
aws s3api put-bucket-notification-configuration \
--bucket project-7-serverless-event-upload \
--notification-configuration file://s3-notification.json \
--region us-east-1
echo "✅ Project 7 deployment complete!"
📊 Key Technical Features
- Event-Driven Architecture: S3 events automatically trigger Lambda processing
- Serverless Design: No server management, automatic scaling
- Infrastructure as Code: Complete CloudFormation/SAM templates
- Error Handling: Comprehensive exception management and logging
- Security: IAM least privilege, encrypted S3 buckets
- Monitoring: CloudWatch logs and metrics
← Back to Portfolio