At times there is a requirement to centralize data from different sources. This could be required for various needs,
- maybe you have to aggregate different data sources to perform better analytics
- Or, you are building a system ground up that utilizes the old data, performs transformations and stores it into the new system in near real time
There could be any number of reason behind data ingestion. And in this article I will explore a very cost effective, scalable and completely serverless architecture that could do just that. I have used AWS cloud for the same but I’m sure you can find a similar services with your native cloud provider. The architecture and principles remains the same.
Table of Contents
Prerequisite
- An active AWS Account or any Cloud provider
- A user with sufficient permissions to create the assets
- Python 3.9 or higher
- Terraform
I have chosen terraform to keep it generic, although I will be using AWS Provider but you can use any other provider and their respective resources using similar setup.
Limitations
- This architecture only takes basic security considerations and is not meant for a production system. However, it can easily be transformed into a production system.
- There is no frontend to control the ingestion configuration
Architecture
This diagram outlines a high-level architecture for a serverless event-driven data ingestion system implemented within the AWS Cloud environment, designed to accommodate data from multiple sources. It follows a common pattern called “Content Based Filtering” where the individual messages are routed to the workers based on the message type (in this case ‘source’ as type) rather than broadcasting all message to all workers. That way worker doesn’t have the responsibility of filtering and just focus on processing.
This architecture might come naturally to someone who has worked with Microservices. If you are interested in microservices, these are the good starting points:
- Principles of Microservices – Deploy Independently
- Principles of Microservices – Decentralize Everything
- Principles of Microservices – Hide Implementation Details
Data Flow in the System
1. Data Sources and Initial Trigger
At the onset, we have various data sources, namely Source A, B, C and D. These sources represent different systems or applications that generate data which needs to be ingested into the cloud infrastructure.
2. API Gateway and Lambda Function
The entry point for data is through the Amazon API Gateway, which provides a unified interface to trigger the ingestion process. The API Gateway is configured to receive data in a specific format, ensuring that the data from multiple sources conforms to a standard protocol.
3. AWS Lambda Functions as APIs
Upon receiving data, the API Gateway triggers AWS Lambda functions. These serverless compute services run code in response to triggers such as the API Gateway and automatically manage the underlying compute resources.
4. Data Routing with SNS to Queues
The Lambda functions then publish the incoming events to Amazon Simple Notification Service (SNS), a managed service that provides message delivery from publishers to subscribers (in this case, the message queues). SNS acts as a dispatcher, efficiently handling the throughput of messages.
5. Individual Queue Per Data Source
Following SNS, there are individual queues for each data source created using Amazon SQS (Simple Queue Service). There are separate queues for different data sources such as A, B, C and D, allowing for organized and isolated handling of messages from each source. This setup also includes a dead-letter queue to handle failed messages, ensuring that message processing can be retried or inspected without losing the information.
6. Data Processing
Additional Lambda functions are linked to each SQS queue as workers. These functions are invoked to process messages from the respective queues asynchronously. The processing might involve data transformation, validation, and preparation for storage.
7. Data Storage
The processed data is then stored in an AWS data storage service which could be Amazon S3 for object storage, Amazon RDS or DynamoDB for database storage, or any other suitable AWS data storage solution, depending on the requirements.
Error Handling
- The architecture includes a strategy for error handling, where failed messages that cannot be processed after several attempts are routed to a dead-letter queue. This allows for isolating and troubleshooting erroneous data without interrupting the normal flow of data ingestion.
Key Architectural Benefits
- The use of AWS services provides a scalable, reliable, and secure infrastructure that can handle large volumes of data and variable loads with ease.
- Serverless components like Lambda ensure that you only pay for the compute time you consume, contributing to a cost-effective solution.
- Event-driven architecture facilitates real-time data processing, making the system reactive and dynamic in response to incoming data.
Overall, this architecture provides a robust framework for ingesting data from multiple sources using serverless technologies, ensuring scalability, reliability, and maintainability.
Code – Infrastructure with Terraform
The way I like to structure my terraform code is in modules. Leveraging modules keeps the code quite maintainable and isolated. Another thing I like to leverage are the variables for creating repeatable infra units.
For this structure, here is the code structure that I followed.
The root directory contains following directories:
.
├── config
└── modules
├── common-services
└── ingestion
From the root till level 2 including files it is structured as below.
.
├── backend.tf
├── config
│ ├── backend.conf
│ ├── backend_local.conf
│ ├── terraform.tfvars
│ └── terraform_local.tfvars
├── main.tf
├── modules
│ ├── common-services
│ └── ingestion
├── output.tf
├── providers.tf
├── terraform.tf
└── variables.tf
Now, I will enter inside the ingestion module and you will see different files for different services and stacks. I like to create different .tf files for different components and services. That way I know where to go for what. And as you can see, these are pretty self explanatory.
modules/ingestion
├── apis.tf
├── glue.tf
├── lambda.tf
├── roles.tf
├── s3.tf
├── sns.tf
├── sqs.tf
└── variables.tf
I would like to take you through each file and its contents one by one and explain the bits along the way if needed.
lambda.tf
This file contains only the code for creating lambdas and their related resources in AWS. Here, you will find the creation of two type of lambda functions:
- Ingestion API Lambda
- SQS Worker Lambdas
This is a good time to take a quick look at the architecture to understand where these lambdas are positioned.
locals {
common_ingestion_api_lambda = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "ingestion_api_lambda",
handler = "ingestion_api_function.lambda_handler",
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
}
sqs_worker_lambdas = {
sourceA = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "sqs_sourceA_worker_${var.suffix}"
handler = "sqs_worker_function.handle_sourceA_message"
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
timeout = 60
},
sourceB = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "sqs_sourceB_worker_${var.suffix}"
handler = "sqs_worker_function.handle_sourceB_message"
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
timeout = 60
},
sourceC = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "sqs_sourceC_worker_${var.suffix}"
handler = "sqs_worker_function.handle_sourceC_message"
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
timeout = 60
}
}
}
# Create Lambda functions Ingestion API
resource "aws_lambda_function" "ingestion_api" {
s3_bucket = local.common_ingestion_api_lambda.s3_bucket
s3_key = local.common_ingestion_api_lambda.s3_key
function_name = local.common_ingestion_api_lambda.function_name
handler = local.common_ingestion_api_lambda.handler
runtime = local.common_ingestion_api_lambda.runtime
role = aws_iam_role.lambda_execution_role.arn
layers = [
"arn:aws:lambda:${var.region}:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:32",
var.lambda_dependency_layer_arn
]
source_code_hash = local.common_ingestion_api_lambda.source_code_hash
depends_on = [data.archive_file.ingestion_lambda_zip, aws_s3_bucket.ingestion_lambda_bucket, aws_s3_object.lambda_zip]
}
resource "aws_cloudwatch_log_group" "ingestion_api" {
#checkov:skip=CKV_AWS_338:Ensure CloudWatch log groups retains logs for at least 1 year
#checkov:skip=CKV_AWS_158:Ensure that CloudWatch Log Group is encrypted by KMS
name = "/aws/lambda/${aws_lambda_function.ingestion_api.function_name}"
retention_in_days = 30
}
# Create Lambda functions SQS_WORKER
resource "aws_lambda_function" "sqs_worker" {
for_each = local.sqs_worker_lambdas
s3_bucket = each.value.s3_bucket
s3_key = each.value.s3_key
function_name = each.value.function_name
handler = each.value.handler
runtime = each.value.runtime
role = aws_iam_role.sqs_lambda.arn
timeout = each.value.timeout
layers = [
"arn:aws:lambda:${var.region}:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:32",
var.lambda_dependency_layer_arn
]
source_code_hash = each.value.source_code_hash
depends_on = [data.archive_file.ingestion_lambda_zip, aws_s3_bucket.ingestion_lambda_bucket, aws_s3_object.lambda_zip]
}
resource "aws_cloudwatch_log_group" "sqs_worker" {
for_each = local.sqs_worker_lambdas
#checkov:skip=CKV_AWS_338:Ensure CloudWatch log groups retains logs for at least 1 year
#checkov:skip=CKV_AWS_158:Ensure that CloudWatch Log Group is encrypted by KMS
name = "/aws/lambda/${aws_lambda_function.sqs_worker[each.key].function_name}"
retention_in_days = 30
}
apis.tf
An API Gateway in AWS acts as the front door to your serverless applications. It directs incoming requests to the appropriate functions based on pre-defined paths and methods. Let’s break down the key elements that make up an API Gateway:
- Base API: This is the foundation, the starting point for your API. Think of it as the root directory on your computer.
- Resources: These represent specific functionalities within your API, similar to folders within a directory structure. You define paths to access these resources, allowing users to interact with different parts of your application.
- Methods: These define how users can interact with a resource. In REST API, these are mapped directly to POST, PUT, GET, DELETE, PATCH etc.
- Integration: Here’s where the magic happens! You connect your API Gateway resources with the actual logic that handles requests. In this case, we’re integrating with a Lambda function, but API Gateway can connect to various backend services.
- Deployment: Once everything is configured, it’s time to make your API accessible! Deployment publishes your API Gateway, making it live and ready to receive requests from users.
With the above information in mind, just take a look at the following resource. Ignore the locals
for now. This is the variable that holds the information for creating different APIs in the system. Essentially, one API Endpoint per data source.
locals {
endpoints = {
"sourceA" = {
path = "a"
lambda_config = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "ingestion_api_lambda",
handler = "ingestion_api_function.lambda_handler",
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
}
},
"sourceB" = {
path = "b"
lambda_config = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "ingestion_api_lambda",
handler = "ingestion_api_function.lambda_handler",
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
}
},
"sourceC" = {
path = "c"
lambda_config = {
s3_bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
s3_key = aws_s3_object.lambda_zip.key
function_name = "ingestion_api_lambda",
handler = "ingestion_api_function.lambda_handler",
runtime = "python3.9"
source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
}
}
}
endpoint_methods = {
"sourceA-POST": {
method = "POST",
endpoint_key = "sourceA"
},
"sourceB-POST": {
method = "POST",
endpoint_key = "sourceB"
},
"sourceC-POST": {
method = "POST",
endpoint_key = "sourceC"
}
}
}
# Define the API Gateway
resource "aws_api_gateway_rest_api" "ingestion_api" {
name = "ingestion_api"
description = "API for data ingestion"
}
# Create API Gateway resources for each endpoint
resource "aws_api_gateway_resource" "ingestion_api" {
for_each = local.endpoints
rest_api_id = aws_api_gateway_rest_api.ingestion_api.id
parent_id = aws_api_gateway_rest_api.ingestion_api.root_resource_id
path_part = each.value.path
}
resource "aws_api_gateway_method" "ingestion_api" {
for_each = local.endpoint_methods
rest_api_id = aws_api_gateway_rest_api.ingestion_api.id
resource_id = aws_api_gateway_resource.ingestion_api[each.value.endpoint_key].id
http_method = each.value.method
authorization = "NONE"
}
# Integrate API methods with Lambda functions
resource "aws_api_gateway_integration" "ingestion_api" {
for_each = aws_api_gateway_method.ingestion_api
rest_api_id = aws_api_gateway_rest_api.ingestion_api.id
resource_id = each.value.resource_id
http_method = each.value.http_method
type = "AWS_PROXY"
uri = aws_lambda_function.ingestion_api.invoke_arn
integration_http_method = "POST"
}
# Deploy the API Gateway
resource "aws_api_gateway_deployment" "api_ingestion_deployment" {
depends_on = [aws_api_gateway_integration.ingestion_api]
rest_api_id = aws_api_gateway_rest_api.ingestion_api.id
stage_name = "v1"
triggers = {
redeployment = sha1(jsonencode(aws_api_gateway_integration.ingestion_api))
}
}
# Output the base URL of the API Gateway
output "base_url" {
value = "${aws_api_gateway_rest_api.ingestion_api.execution_arn}/api"
}
roles.tf
This is where I managed all the roles and permission for all the different resources in ingestion module. These are supposed to be self explanatory if you know how permissions and policies work in AWS or in general. But for now you can simply skip this section and come back later. Said that, these are a very important bit that you should take extra care while describing. This could become a big security vulnerability if not done properly. Always follow least privilege approach while defining permissions.
resource "aws_iam_role" "lambda_execution_role" {
name = "lambda_execution_role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
},
]
})
}
resource "aws_iam_policy" "lambda_policy" {
name = "lambda_policy"
description = "IAM policy for Lambda to access S3 and DB"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
// Add DB permissions here
]
Effect = "Allow"
Resource = "*"
},
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": aws_sns_topic.sns_router.arn
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
role = aws_iam_role.lambda_execution_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
resource "aws_lambda_permission" "api_gateway_invoke" {
statement_id = "AllowExecutionFromAPIGateway"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.ingestion_api.function_name
principal = "apigateway.amazonaws.com"
# This source_arn restricts access to a specific API Gateway stage and method
source_arn = "${aws_api_gateway_rest_api.ingestion_api.execution_arn}/*/*/*"
}
resource "aws_iam_role" "sqs_lambda" {
name = "sqs_lambda_${var.suffix}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
},
]
})
}
resource "aws_iam_policy" "lambda_sqs_policy" {
name = "lambda_sqs_policy"
description = "Allows Lambda function to consume messages from SQS queues"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
],
Resource = values(aws_sqs_queue.queues)[*].arn,
Effect = "Allow",
},
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
Resource = "arn:aws:logs:*:*:*",
Effect = "Allow",
},
{
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:DeleteItem",
],
"Resource": "arn:aws:dynamodb:${var.region}:${var.account_id}:table/datastore"
}
],
})
}
resource "aws_iam_role_policy_attachment" "lambda_sqs_attach" {
role = aws_iam_role.sqs_lambda.name
policy_arn = aws_iam_policy.lambda_sqs_policy.arn
}
resource "aws_sqs_queue_policy" "queue_policy" {
for_each = local.queues
queue_url = aws_sqs_queue.queues[each.key].url
policy = jsonencode({
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Principal: {
AWS: "*"
},
Action: "sqs:SendMessage",
Resource: aws_sqs_queue.queues[each.key].arn,
Condition: {
ArnEquals: {
"aws:SourceArn": aws_sns_topic.sns_router.arn
}
}
}
]
})
}
resource "aws_iam_role" "sns" {
name = "sns_role_${var.suffix}"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Principal = {
Service = "sns.amazonaws.com"
},
Effect = "Allow",
Sid = "",
},
],
})
}
resource "aws_iam_policy" "sns" {
name = "sns_policy_${var.suffix}"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
Resource = "arn:aws:logs:*:*:*"
},
],
})
}
resource "aws_iam_role_policy_attachment" "sns_logging_policy_attachment" {
role = aws_iam_role.sns.name
policy_arn = aws_iam_policy.sns.arn
}
s3.tf
This file holds the creation of different Simple Storage Buckets for uploading lambda code. This is not needed but a good practice to keep the versions of your deployed code in s3 so that its easy to revert back and go to the previous version if needed. Also, helps in keeping the audit and debugging.
resource "random_string" "bucket_suffix" {
length = 8
special = false
upper = false
keepers = {
# Generate a new suffix only if one of these attributes changes
attr = "value"
}
}
resource "aws_s3_bucket" "ingestion_lambda_bucket" {
bucket = "ingestion-lambda-bucket-${random_string.bucket_suffix.result}"
}
data "archive_file" "ingestion_lambda_zip" {
type = "zip"
source_dir = "${path.root}/../ingestion"
output_path = "${path.module}/../ingestion_function.zip"
excludes = ["venv", "test", ".idea"]
}
resource "aws_s3_object" "lambda_zip" {
bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
key = "${data.archive_file.ingestion_lambda_zip.output_base64sha256}.zip"
source = data.archive_file.ingestion_lambda_zip.output_path
depends_on = [aws_s3_bucket.ingestion_lambda_bucket,data.archive_file.ingestion_lambda_zip]
}
sqs.tf
This creates the FIFO Queues as described in the architecture for each source.
Note: you need to suffix queue name with .fifo
in case of a FIFO queue. Or it will throw error.
locals {
queues = {
"sourceA" = {
name = "sourceA-${var.suffix}.fifo",
fifo = true,
deduplication = true,
},
"sourceB" = {
name = "sourceB-${var.suffix}.fifo",
fifo = true,
deduplication = true,
},
"sourceC" = {
name = "sourceC-${var.suffix}.fifo",
fifo = true,
deduplication = true,
}
}
}
resource "aws_sqs_queue" "queues" {
for_each = local.queues
name = each.value.name
fifo_queue = each.value.fifo
content_based_deduplication = each.value.deduplication
visibility_timeout_seconds = 120
# Enable if you want automatic deduplication based on the content
depends_on = [aws_lambda_function.sqs_worker]
}
resource "aws_lambda_event_source_mapping" "sqs_queue_mapping" {
for_each = local.queues
event_source_arn = aws_sqs_queue.queues[each.key].arn
function_name = aws_lambda_function.sqs_worker[each.key].arn
}
sns.tf
This is Simple Notification Service and its role is to provide a single topic for the API lambda to publish message. And based on the message attribute source
, it decides where to route the message.
It is pointing to the locals
that is defined in SQS
file. Since, the queues used is same, it was straighforward in my head to use the same variable. But if you want to keep that isolated, you are free to do so.
The filter policy under subscription is the part that defines which message to be routed. In this case, the message attribute must contain the field source
with the defined source or it will be dropped. The source must be either sourceA
, sourceB
, or sourceC
.
resource "aws_sns_topic" "sns_router" {
name = "data-ingestion.fifo"
fifo_topic = true
}
resource "aws_sns_topic_subscription" "sns_subscription" {
for_each = local.queues
topic_arn = aws_sns_topic.sns_router.arn
protocol = "sqs"
endpoint = aws_sqs_queue.queues[each.key].arn
filter_policy = jsonencode({
source = [each.key]
})
}
dynamodb.tf
Finally, I needed a database to store the ingested data. For the purpose of this architecture, I will setup a Dynamo dB table with a very generic structure to store the ingested data.
resource "aws_dynamodb_table" "dynamodb" {
name = "datastore"
billing_mode = "PAY_PER_REQUEST" # Or "PROVISIONED" based on your usage pattern
hash_key = "ID"
attribute {
name = "ID"
type = "S" # S = String, N = Number, B = Binary
}
# attributes used in Global Secondary Indexes as 'attribute' blocks
attribute {
name = "ENTITY_NAME"
type = "S"
}
# Global Secondary Index for ENTITY_NAME
global_secondary_index {
name = "EntityNameIndex"
hash_key = "ENTITY_NAME"
projection_type = "ALL" # Adjust based on needs, ALL includes all attributes in the index
}
tags = {
Environment = "dev"
}
}
Once you have setup the terraform code correctly (along with the lambda code) and execute this module, you should see the infrastructure built and deployed in your AWS account.
Code – Ingestion API Lambda
Let’s look at the ingestion api code for lambda that would take the data in from different sources and publish it to the SNS. This is the APIs lambda after API Gateway.
In the code below, all methods has the similar code as I’m not doing much with the data. But in real world, you would perform the required validation on the events received in the request to decide whether to process or reject it in the beginning.
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.api_gateway import APIGatewayRestResolver
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.utilities.typing import LambdaContext
import boto3
import hashlib
from datetime import datetime
logger = Logger()
router = APIGatewayRestResolver()
# Initialize the Boto3 SNS client
sns_client = boto3.client('sns')
# SNS Topic ARN - replace with your actual SNS topic ARN
SNS_TOPIC_ARN = 'arn:aws:sns:<REGION>:<ACCOUNT_ID>:data-ingestion.fifo'
def get_aws_context_info():
logger.info("Getting aws context info")
session = boto3.session.Session()
region = session.region_name
client = boto3.client('sts')
account_id = client.get_caller_identity()["Account"]
logger.info(f"Region: {region}, Account: {account_id}")
return region, account_id
def publish_to_sns(message, topic_arn, deduplication_id, group_id):
"""
Publishes a message to an SNS topic with message attributes, including custom attributes
for deduplication ID and group ID for informational purposes or downstream processing.
:param message: Message to be sent.
:param topic_arn: ARN of the SNS topic.
:param deduplication_id: Custom deduplication ID for the message.
:param group_id: Custom group ID for the message.
"""
response = sns_client.publish(
TopicArn=topic_arn,
Message=json.dumps(message), # Assuming you want to send the 'body' part as the main message
MessageGroupId=group_id,
MessageDeduplicationId=deduplication_id,
MessageAttributes={
'source': {
'DataType': 'String',
'StringValue': message['source']
},
}
)
return response
@router.post("/sourceA")
def put_sourceA():
logger.info("POST /sourceA was called")
body = router.current_event.json_body
body['timestamp'] = datetime.utcnow().isoformat()
logger.info(f"Body: {body}")
# Publish the message to SNS
response = publish_to_sns(message=body, topic_arn=SNS_TOPIC_ARN,
deduplication_id=hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest(), group_id=body['source'])
logger.info(f"Message published to SNS: {response}")
return {"body": "POST /sourceA success", "statusCode": 201}
@router.post("/sourceB")
def put_sourceB():
logger.info("POST /sourceB was called")
body = router.current_event.json_body
body['timestamp'] = datetime.utcnow().isoformat()
# Publish the message to SNS
response = publish_to_sns(message=body, topic_arn=SNS_TOPIC_ARN,
deduplication_id=hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest(), group_id=body['source'])
logger.info(f"Message published to SNS: {response}")
return {"body": "POST /sourceB success", "statusCode": 201}
@router.post("/sourceC")
def put_sourceC():
logger.info("POST /sourceC was called")
body = router.current_event.json_body
body['timestamp'] = datetime.utcnow().isoformat()
# Publish the message to SNS
response = publish_to_sns(message=body, topic_arn=SNS_TOPIC_ARN,
deduplication_id=hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest(),
group_id=body['source'])
logger.info(f"Message published to SNS: {response}")
return {"body": "POST /sourceC success", "statusCode": 201}
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST, log_event=True)
def lambda_handler(event: dict, context: LambdaContext):
logger.info(f"Request Received: {event}")
return router.resolve(event, context)
The most important method in the above code is the publish_to_sns()
method as it describes some important key structure for the message.
As we are using the fifo
queue, it is mandatory to provide the MessageGroupId
and MessageDeduplicationId
. Also, we are using filter policy based on the source field of the message, we must provide that in the MessageAttributes
. Here, the message source is parsed from the source
key of the message.
This code publishes the message to the SNS and then SNS routes that message to the downstream SQS based on the filter policy described in the SNS subscription on the source
attribute.
def publish_to_sns(message, topic_arn, deduplication_id, group_id):
"""
Publishes a message to an SNS topic with message attributes, including custom attributes
for deduplication ID and group ID for informational purposes or downstream processing.
:param message: Message to be sent.
:param topic_arn: ARN of the SNS topic.
:param deduplication_id: Custom deduplication ID for the message.
:param group_id: Custom group ID for the message.
"""
response = sns_client.publish(
TopicArn=topic_arn,
Message=json.dumps(message), # Assuming you want to send the 'body' part as the main message
MessageGroupId=group_id,
MessageDeduplicationId=deduplication_id,
MessageAttributes={
'source': {
'DataType': 'String',
'StringValue': message['source']
},
}
)
return response
Code – SQS Worker Lambda
Last we would look at the Worker SQS Lambda code that polls the message from the SQS in batch and processes it.
The code below, parses the data based on the defined data structure and persists it into the database.
import json
import uuid
from datetime import datetime
import boto3
from aws_lambda_powertools import Logger
from entities.entities import Milestone, Contract
logger = Logger()
# Initialize DynamoDB and SQS clients
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('datastore')
def handle_sourceA_message(event, context):
for record in event['Records']:
try:
logger.info(record)
# First, parse the SQS message body
message_body = json.loads(record['body'])
logger.info(message_body)
# Then, parse the nested 'Message' field within the SQS message body
sns_message = json.loads(message_body['Message'])
logger.info(sns_message)
# Check the 'entity_name' and proceed if it matches 'EQUIPMENT'
if sns_message['entity_name'] == 'EQUIPMENT':
persist_equipment(sns_message['body'])
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
return {"statusCode": 500}
return {
'statusCode': 200,
'body': json.dumps('Successfully processed SQS messages.')
}
def handle_sourceB_message(event, context):
for record in event['Records']:
try:
logger.info(record)
# First, parse the SQS message body
message_body = json.loads(record['body'])
logger.info(message_body)
# Then, parse the nested 'Message' field within the SQS message body
sns_message = json.loads(message_body['Message'])
logger.info(sns_message)
# Check the 'entity_name' and proceed if it matches 'CONTRACT'
if sns_message['entity_name'] == 'CONTRACT':
persist_contract(sns_message['body'])
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
return {"statusCode": 500}
return {
'statusCode': 200,
'body': json.dumps('Successfully processed SQS messages.')
}
def handle_sourceC_message(event, context):
for record in event['Records']:
try:
logger.info(record)
# First, parse the SQS message body
message_body = json.loads(record['body'])
logger.info(message_body)
# Then, parse the nested 'Message' field within the SQS message body
sns_message = json.loads(message_body['Message'])
logger.info(sns_message)
if sns_message['entity_name'] == 'MILESTONE':
persist_milestone(sns_message['body'])
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
return {"statusCode": 500}
def persist_contract(contract_body):
contract_item = Contract.from_event(contract_body)
logger.info(f"Writing contract to DynamoDB: {contract_item}")
try:
response = table.put_item(Item=contract_item.to_dict())
logger.info(f"PutItem succeeded: {response}")
except Exception as e:
logger.error(f"Error writing contract to DynamoDB: {str(e)}")
def persist_milestone(milestone):
milestone_item = Milestone.from_event(milestone)
logger.info(f"Writing milestone to DynamoDB: {milestone_item}")
try:
response = table.put_item(Item=milestone_item.to_dict())
logger.info(f"PutItem succeeded: {response}")
except Exception as e:
logger.error(f"Error writing contract to DynamoDB: {str(e)}")
def persist_equipment(equipment):
unique_id = str(uuid.uuid4())
equipment_item = {
'ID': f"EQUIPMENT#{unique_id}",
'entity_name': 'EQUIPMENT',
'cat_id': equipment.get('cat_id', ''),
'contract_id': equipment.get('contract_id', ''),
'parent_equipment_id': equipment.get('parent_id', ''),
'current_milestone_id': equipment.get('current_milestone_id', '')
}
logger.info(f"Writing equipment to DynamoDB: {equipment_item}")
try:
response = table.put_item(Item=equipment_item)
logger.info(f"PutItem succeeded: {response}")
except Exception as e:
logger.error(f"Error writing contract to DynamoDB: {str(e)}")
In order to understand the code, you must understand the structure of the message. So, for this tutorial, I curated a sample data structure of a industrial supply chain system with 3 main entities e.g. Contract, Milestone and Equipment.
You can see the structure of the data below:
CONTRACTS | |
CONTRACT_ID | PK |
CONTRACT_NAME | VARCHAR |
CONTRACT_TYPE | ENUM |
CONTRACT_STATUS | ENUM |
MILESTONES | |
MILESTONE_ID | PK |
CONTRACT_ID | FK (CONTRACTS > CONTRACT_ID) |
DESCRIPTION | VARCHAR |
START_DATE | DATETIME |
END_DATE | DATETIME |
EQUIPMENTS | |
EQUIPMENT_ID | PK |
PARENT_EQUIPMENT_ID | FK (EQUIPMENTS>EQUIPMENT_ID) |
CONTRACT_ID | FK (CONTRACTS > CONTRACT_ID) |
CURRENT_MILESTONE | FK (MILESTONES > MILESTONE_ID) |
Conclusion
In the ever-evolving landscape of cloud computing, AWS’s serverless event-driven architecture offers a promising pathway to enhance data ingestion from multiple and diverse sources. By leveraging services like AWS Lambda, SNS, and SQS, businesses can achieve more scalable, efficient, and cost-effective data handling processes. However, the transition to a serverless model is not without its challenges. It requires a thoughtful approach to design, execution, and continuous improvement.
What challenges have you faced while integrating serverless architectures into your data operations?
Have you found innovative ways to overcome these challenges, or are there aspects of serverless technology that still seem daunting?
Share your experiences and thoughts in the comments below. Your insights not only contribute to a richer discussion but also help in shaping more robust serverless solutions for everyone.