AW
Messaging
Aws integration v1.0.0
AWS Messaging Services
Overview
This skill covers AWS messaging services including SQS, SNS, and EventBridge. These services enable decoupled, event-driven architectures with reliable message delivery.
Key Concepts
Messaging Service Comparison
┌─────────────────────────────────────────────────────────────┐
│ AWS Messaging Services │
├─────────────────────────────────────────────────────────────┤
│ │
│ SQS (Simple Queue Service) │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ • Point-to-point messaging │ │
│ │ • At-least-once delivery (Standard) │ │
│ │ • Exactly-once delivery (FIFO) │ │
│ │ • Retention: up to 14 days │ │
│ │ • Max message size: 256 KB │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ SNS (Simple Notification Service) │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ • Pub/sub messaging │ │
│ │ • Fan-out to multiple subscribers │ │
│ │ • Push-based delivery │ │
│ │ • Filtering by message attributes │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ EventBridge │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ • Event bus for application integration │ │
│ │ • Content-based filtering (rules) │ │
│ │ • Schema registry │ │
│ │ • Archives and replay │ │
│ │ • Cross-account/region delivery │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ Common Patterns: │
│ │
│ SNS → SQS (Fan-out) │
│ ┌─────┐ ┌─────┐ │
│ │ SNS │──────│ SQS │──▶ Consumer A │
│ │Topic│──┐ └─────┘ │
│ └─────┘ │ ┌─────┐ │
│ └───│ SQS │──▶ Consumer B │
│ └─────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Best Practices
1. Use Dead Letter Queues
Capture failed messages for analysis and reprocessing.
2. Enable Server-Side Encryption
Use KMS for encryption at rest.
3. Implement Idempotent Consumers
Messages may be delivered more than once.
4. Set Appropriate Visibility Timeout
Match timeout to expected processing time.
5. Use FIFO When Ordering Matters
SQS FIFO guarantees message ordering.
Code Examples
Example 1: SQS Queue with DLQ
import * as sqs from 'aws-cdk-lib/aws-sqs';
export class MessagingStack extends cdk.Stack {
public readonly orderQueue: sqs.Queue;
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Dead Letter Queue
const dlq = new sqs.Queue(this, 'OrderDLQ', {
queueName: 'order-processing-dlq',
retentionPeriod: cdk.Duration.days(14),
encryption: sqs.QueueEncryption.KMS,
encryptionMasterKey: encryptionKey,
});
// Main Queue
this.orderQueue = new sqs.Queue(this, 'OrderQueue', {
queueName: 'order-processing',
visibilityTimeout: cdk.Duration.seconds(300), // 5 minutes
retentionPeriod: cdk.Duration.days(7),
// Encryption
encryption: sqs.QueueEncryption.KMS,
encryptionMasterKey: encryptionKey,
// Dead Letter Queue
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3, // Move to DLQ after 3 failures
},
});
// FIFO Queue for ordered processing
const orderFifoQueue = new sqs.Queue(this, 'OrderFifoQueue', {
queueName: 'order-processing.fifo',
fifo: true,
contentBasedDeduplication: true,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
visibilityTimeout: cdk.Duration.seconds(300),
deadLetterQueue: {
queue: fifoDlq,
maxReceiveCount: 3,
},
});
// Alarms for DLQ
new cloudwatch.Alarm(this, 'DLQAlarm', {
metric: dlq.metricApproximateNumberOfMessagesVisible(),
threshold: 1,
evaluationPeriods: 1,
treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
alarmDescription: 'Messages in DLQ',
});
}
}
Example 2: SNS with SQS Fan-out
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';
export class EventFanoutStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// SNS Topic
const orderTopic = new sns.Topic(this, 'OrderTopic', {
topicName: 'order-events',
displayName: 'Order Events Topic',
masterKey: encryptionKey,
});
// Subscription queues
const inventoryQueue = new sqs.Queue(this, 'InventoryQueue', {
queueName: 'inventory-updates',
});
const notificationQueue = new sqs.Queue(this, 'NotificationQueue', {
queueName: 'customer-notifications',
});
const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue', {
queueName: 'order-analytics',
});
// Subscribe with filtering
orderTopic.addSubscription(
new subscriptions.SqsSubscription(inventoryQueue, {
rawMessageDelivery: true,
filterPolicy: {
eventType: sns.SubscriptionFilter.stringFilter({
allowlist: ['ORDER_CREATED', 'ORDER_CANCELLED'],
}),
},
})
);
orderTopic.addSubscription(
new subscriptions.SqsSubscription(notificationQueue, {
rawMessageDelivery: true,
filterPolicy: {
eventType: sns.SubscriptionFilter.stringFilter({
allowlist: ['ORDER_SHIPPED', 'ORDER_DELIVERED'],
}),
priority: sns.SubscriptionFilter.stringFilter({
allowlist: ['HIGH'],
}),
},
})
);
// All events to analytics
orderTopic.addSubscription(
new subscriptions.SqsSubscription(analyticsQueue, {
rawMessageDelivery: true,
})
);
// Lambda subscription for real-time processing
orderTopic.addSubscription(
new subscriptions.LambdaSubscription(alertFunction, {
filterPolicy: {
orderValue: sns.SubscriptionFilter.numericFilter({
greaterThan: 1000,
}),
},
})
);
}
}
Example 3: EventBridge Event Bus
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
export class EventBridgeStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Custom Event Bus
const orderEventBus = new events.EventBus(this, 'OrderEventBus', {
eventBusName: 'order-events',
});
// Archive for replay
new events.Archive(this, 'OrderArchive', {
sourceEventBus: orderEventBus,
archiveName: 'order-events-archive',
eventPattern: {
source: ['com.company.orders'],
},
retention: cdk.Duration.days(365),
});
// Rule for order created events
const orderCreatedRule = new events.Rule(this, 'OrderCreatedRule', {
eventBus: orderEventBus,
ruleName: 'order-created-rule',
eventPattern: {
source: ['com.company.orders'],
detailType: ['OrderCreated'],
detail: {
orderValue: [{ numeric: ['>', 100] }],
},
},
});
// Multiple targets
orderCreatedRule.addTarget(new targets.LambdaFunction(inventoryFunction, {
retryAttempts: 3,
maxEventAge: cdk.Duration.hours(2),
}));
orderCreatedRule.addTarget(new targets.SqsQueue(notificationQueue, {
message: events.RuleTargetInput.fromEventPath('$.detail'),
}));
// Cross-account event delivery
const partnerEventBus = events.EventBus.fromEventBusArn(
this,
'PartnerBus',
'arn:aws:events:us-east-1:123456789012:event-bus/partner-bus'
);
const partnerRule = new events.Rule(this, 'PartnerRule', {
eventBus: orderEventBus,
eventPattern: {
source: ['com.company.orders'],
detailType: ['OrderShipped'],
},
});
partnerRule.addTarget(new targets.EventBus(partnerEventBus));
// Schema discovery
const schemaRegistry = new events.SchemaRegistry(this, 'OrderSchemas', {
registryName: 'order-events-registry',
});
// Scheduled rule
new events.Rule(this, 'DailyReport', {
schedule: events.Schedule.cron({ minute: '0', hour: '8' }),
targets: [new targets.LambdaFunction(reportFunction)],
});
}
}
Example 4: Message Processing with Lambda
import { SQSHandler, SQSEvent, SQSRecord, SQSBatchResponse } from 'aws-lambda';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
const docClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
// Batch processing with partial failures
export const handler: SQSHandler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
const batchItemFailures: { itemIdentifier: string }[] = [];
// Process messages in parallel
const results = await Promise.allSettled(
event.Records.map(record => processMessage(record))
);
// Report failures
results.forEach((result, index) => {
if (result.status === 'rejected') {
console.error(`Failed to process message: ${result.reason}`);
batchItemFailures.push({
itemIdentifier: event.Records[index].messageId,
});
}
});
return { batchItemFailures };
};
async function processMessage(record: SQSRecord): Promise<void> {
const order = JSON.parse(record.body);
// Idempotency check
const messageId = record.messageId;
const deduplicationKey = record.attributes.MessageDeduplicationId || messageId;
try {
// Process with idempotency
await docClient.send(new PutCommand({
TableName: process.env.TABLE_NAME,
Item: {
PK: `ORDER#${order.id}`,
SK: `ORDER#${order.id}`,
...order,
processedAt: new Date().toISOString(),
messageId: deduplicationKey,
},
ConditionExpression: 'attribute_not_exists(messageId) OR messageId = :mid',
ExpressionAttributeValues: {
':mid': deduplicationKey,
},
}));
console.log(`Processed order ${order.id}`);
} catch (error) {
if ((error as any).name === 'ConditionalCheckFailedException') {
console.log(`Order ${order.id} already processed, skipping`);
return; // Idempotent - don't throw
}
throw error;
}
}
// FIFO message sending with grouping
async function sendOrderEvent(order: Order, sqsClient: SQSClient): Promise<void> {
await sqsClient.send(new SendMessageCommand({
QueueUrl: process.env.ORDER_QUEUE_URL,
MessageBody: JSON.stringify({
eventType: 'ORDER_CREATED',
timestamp: new Date().toISOString(),
data: order,
}),
MessageGroupId: order.customerId, // Group by customer for ordering
MessageDeduplicationId: `${order.id}-${order.version}`,
MessageAttributes: {
eventType: {
DataType: 'String',
StringValue: 'ORDER_CREATED',
},
orderValue: {
DataType: 'Number',
StringValue: order.total.toString(),
},
},
}));
}
Example 5: EventBridge Event Publishing
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
const eventBridge = new EventBridgeClient({});
interface DomainEvent {
type: string;
source: string;
data: unknown;
metadata?: Record<string, unknown>;
}
class EventPublisher {
constructor(private readonly eventBusName: string) {}
async publish(events: DomainEvent[]): Promise<void> {
// Batch events (max 10 per request)
const batches = this.chunk(events, 10);
for (const batch of batches) {
const entries = batch.map(event => ({
EventBusName: this.eventBusName,
Source: event.source,
DetailType: event.type,
Detail: JSON.stringify({
...event.data,
metadata: {
...event.metadata,
publishedAt: new Date().toISOString(),
correlationId: event.metadata?.correlationId || crypto.randomUUID(),
},
}),
Time: new Date(),
}));
const result = await eventBridge.send(new PutEventsCommand({
Entries: entries,
}));
// Check for failures
if (result.FailedEntryCount && result.FailedEntryCount > 0) {
const failures = result.Entries?.filter(e => e.ErrorCode) || [];
console.error('Failed to publish events:', failures);
throw new Error(`Failed to publish ${result.FailedEntryCount} events`);
}
}
}
private chunk<T>(array: T[], size: number): T[][] {
return Array.from({ length: Math.ceil(array.length / size) }, (_, i) =>
array.slice(i * size, i * size + size)
);
}
}
// Usage in domain service
class OrderService {
constructor(
private readonly eventPublisher: EventPublisher,
private readonly orderRepository: OrderRepository
) {}
async createOrder(command: CreateOrderCommand): Promise<Order> {
const order = Order.create(command);
await this.orderRepository.save(order);
// Publish domain event
await this.eventPublisher.publish([{
type: 'OrderCreated',
source: 'com.company.orders',
data: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
total: order.total,
},
metadata: {
correlationId: command.correlationId,
userId: command.userId,
},
}]);
return order;
}
}
Anti-Patterns
❌ Not Using Dead Letter Queues
// WRONG - messages lost on failure
const queue = new sqs.Queue(this, 'Queue', {
queueName: 'my-queue',
// No DLQ configured
});
// ✅ CORRECT - always use DLQ
const queue = new sqs.Queue(this, 'Queue', {
queueName: 'my-queue',
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});