Skip to content
Home / Skills / Aws / Messaging
AW

Messaging

Aws integration v1.0.0

AWS Messaging Services

Overview

This skill covers AWS messaging services including SQS, SNS, and EventBridge. These services enable decoupled, event-driven architectures with reliable message delivery.


Key Concepts

Messaging Service Comparison

┌─────────────────────────────────────────────────────────────┐
│                 AWS Messaging Services                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  SQS (Simple Queue Service)                                 │
│  ┌───────────────────────────────────────────────────────┐ │
│  │ • Point-to-point messaging                            │ │
│  │ • At-least-once delivery (Standard)                   │ │
│  │ • Exactly-once delivery (FIFO)                        │ │
│  │ • Retention: up to 14 days                            │ │
│  │ • Max message size: 256 KB                            │ │
│  └───────────────────────────────────────────────────────┘ │
│                                                              │
│  SNS (Simple Notification Service)                          │
│  ┌───────────────────────────────────────────────────────┐ │
│  │ • Pub/sub messaging                                   │ │
│  │ • Fan-out to multiple subscribers                     │ │
│  │ • Push-based delivery                                 │ │
│  │ • Filtering by message attributes                     │ │
│  └───────────────────────────────────────────────────────┘ │
│                                                              │
│  EventBridge                                                 │
│  ┌───────────────────────────────────────────────────────┐ │
│  │ • Event bus for application integration               │ │
│  │ • Content-based filtering (rules)                     │ │
│  │ • Schema registry                                     │ │
│  │ • Archives and replay                                 │ │
│  │ • Cross-account/region delivery                       │ │
│  └───────────────────────────────────────────────────────┘ │
│                                                              │
│  Common Patterns:                                            │
│                                                              │
│  SNS → SQS (Fan-out)                                        │
│  ┌─────┐      ┌─────┐                                       │
│  │ SNS │──────│ SQS │──▶ Consumer A                        │
│  │Topic│──┐   └─────┘                                       │
│  └─────┘  │   ┌─────┐                                       │
│           └───│ SQS │──▶ Consumer B                        │
│               └─────┘                                       │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Best Practices

1. Use Dead Letter Queues

Capture failed messages for analysis and reprocessing.

2. Enable Server-Side Encryption

Use KMS for encryption at rest.

3. Implement Idempotent Consumers

Messages may be delivered more than once.

4. Set Appropriate Visibility Timeout

Match timeout to expected processing time.

5. Use FIFO When Ordering Matters

SQS FIFO guarantees message ordering.


Code Examples

Example 1: SQS Queue with DLQ

import * as sqs from 'aws-cdk-lib/aws-sqs';

export class MessagingStack extends cdk.Stack {
  public readonly orderQueue: sqs.Queue;

  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Dead Letter Queue
    const dlq = new sqs.Queue(this, 'OrderDLQ', {
      queueName: 'order-processing-dlq',
      retentionPeriod: cdk.Duration.days(14),
      encryption: sqs.QueueEncryption.KMS,
      encryptionMasterKey: encryptionKey,
    });

    // Main Queue
    this.orderQueue = new sqs.Queue(this, 'OrderQueue', {
      queueName: 'order-processing',
      visibilityTimeout: cdk.Duration.seconds(300), // 5 minutes
      retentionPeriod: cdk.Duration.days(7),
      
      // Encryption
      encryption: sqs.QueueEncryption.KMS,
      encryptionMasterKey: encryptionKey,
      
      // Dead Letter Queue
      deadLetterQueue: {
        queue: dlq,
        maxReceiveCount: 3, // Move to DLQ after 3 failures
      },
    });

    // FIFO Queue for ordered processing
    const orderFifoQueue = new sqs.Queue(this, 'OrderFifoQueue', {
      queueName: 'order-processing.fifo',
      fifo: true,
      contentBasedDeduplication: true,
      deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
      fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
      
      visibilityTimeout: cdk.Duration.seconds(300),
      
      deadLetterQueue: {
        queue: fifoDlq,
        maxReceiveCount: 3,
      },
    });

    // Alarms for DLQ
    new cloudwatch.Alarm(this, 'DLQAlarm', {
      metric: dlq.metricApproximateNumberOfMessagesVisible(),
      threshold: 1,
      evaluationPeriods: 1,
      treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
      alarmDescription: 'Messages in DLQ',
    });
  }
}

Example 2: SNS with SQS Fan-out

import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';

export class EventFanoutStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // SNS Topic
    const orderTopic = new sns.Topic(this, 'OrderTopic', {
      topicName: 'order-events',
      displayName: 'Order Events Topic',
      masterKey: encryptionKey,
    });

    // Subscription queues
    const inventoryQueue = new sqs.Queue(this, 'InventoryQueue', {
      queueName: 'inventory-updates',
    });

    const notificationQueue = new sqs.Queue(this, 'NotificationQueue', {
      queueName: 'customer-notifications',
    });

    const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue', {
      queueName: 'order-analytics',
    });

    // Subscribe with filtering
    orderTopic.addSubscription(
      new subscriptions.SqsSubscription(inventoryQueue, {
        rawMessageDelivery: true,
        filterPolicy: {
          eventType: sns.SubscriptionFilter.stringFilter({
            allowlist: ['ORDER_CREATED', 'ORDER_CANCELLED'],
          }),
        },
      })
    );

    orderTopic.addSubscription(
      new subscriptions.SqsSubscription(notificationQueue, {
        rawMessageDelivery: true,
        filterPolicy: {
          eventType: sns.SubscriptionFilter.stringFilter({
            allowlist: ['ORDER_SHIPPED', 'ORDER_DELIVERED'],
          }),
          priority: sns.SubscriptionFilter.stringFilter({
            allowlist: ['HIGH'],
          }),
        },
      })
    );

    // All events to analytics
    orderTopic.addSubscription(
      new subscriptions.SqsSubscription(analyticsQueue, {
        rawMessageDelivery: true,
      })
    );

    // Lambda subscription for real-time processing
    orderTopic.addSubscription(
      new subscriptions.LambdaSubscription(alertFunction, {
        filterPolicy: {
          orderValue: sns.SubscriptionFilter.numericFilter({
            greaterThan: 1000,
          }),
        },
      })
    );
  }
}

Example 3: EventBridge Event Bus

import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';

export class EventBridgeStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Custom Event Bus
    const orderEventBus = new events.EventBus(this, 'OrderEventBus', {
      eventBusName: 'order-events',
    });

    // Archive for replay
    new events.Archive(this, 'OrderArchive', {
      sourceEventBus: orderEventBus,
      archiveName: 'order-events-archive',
      eventPattern: {
        source: ['com.company.orders'],
      },
      retention: cdk.Duration.days(365),
    });

    // Rule for order created events
    const orderCreatedRule = new events.Rule(this, 'OrderCreatedRule', {
      eventBus: orderEventBus,
      ruleName: 'order-created-rule',
      eventPattern: {
        source: ['com.company.orders'],
        detailType: ['OrderCreated'],
        detail: {
          orderValue: [{ numeric: ['>', 100] }],
        },
      },
    });

    // Multiple targets
    orderCreatedRule.addTarget(new targets.LambdaFunction(inventoryFunction, {
      retryAttempts: 3,
      maxEventAge: cdk.Duration.hours(2),
    }));

    orderCreatedRule.addTarget(new targets.SqsQueue(notificationQueue, {
      message: events.RuleTargetInput.fromEventPath('$.detail'),
    }));

    // Cross-account event delivery
    const partnerEventBus = events.EventBus.fromEventBusArn(
      this,
      'PartnerBus',
      'arn:aws:events:us-east-1:123456789012:event-bus/partner-bus'
    );

    const partnerRule = new events.Rule(this, 'PartnerRule', {
      eventBus: orderEventBus,
      eventPattern: {
        source: ['com.company.orders'],
        detailType: ['OrderShipped'],
      },
    });

    partnerRule.addTarget(new targets.EventBus(partnerEventBus));

    // Schema discovery
    const schemaRegistry = new events.SchemaRegistry(this, 'OrderSchemas', {
      registryName: 'order-events-registry',
    });

    // Scheduled rule
    new events.Rule(this, 'DailyReport', {
      schedule: events.Schedule.cron({ minute: '0', hour: '8' }),
      targets: [new targets.LambdaFunction(reportFunction)],
    });
  }
}

Example 4: Message Processing with Lambda

import { SQSHandler, SQSEvent, SQSRecord, SQSBatchResponse } from 'aws-lambda';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';

const docClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));

// Batch processing with partial failures
export const handler: SQSHandler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
  const batchItemFailures: { itemIdentifier: string }[] = [];

  // Process messages in parallel
  const results = await Promise.allSettled(
    event.Records.map(record => processMessage(record))
  );

  // Report failures
  results.forEach((result, index) => {
    if (result.status === 'rejected') {
      console.error(`Failed to process message: ${result.reason}`);
      batchItemFailures.push({
        itemIdentifier: event.Records[index].messageId,
      });
    }
  });

  return { batchItemFailures };
};

async function processMessage(record: SQSRecord): Promise<void> {
  const order = JSON.parse(record.body);
  
  // Idempotency check
  const messageId = record.messageId;
  const deduplicationKey = record.attributes.MessageDeduplicationId || messageId;

  try {
    // Process with idempotency
    await docClient.send(new PutCommand({
      TableName: process.env.TABLE_NAME,
      Item: {
        PK: `ORDER#${order.id}`,
        SK: `ORDER#${order.id}`,
        ...order,
        processedAt: new Date().toISOString(),
        messageId: deduplicationKey,
      },
      ConditionExpression: 'attribute_not_exists(messageId) OR messageId = :mid',
      ExpressionAttributeValues: {
        ':mid': deduplicationKey,
      },
    }));

    console.log(`Processed order ${order.id}`);
  } catch (error) {
    if ((error as any).name === 'ConditionalCheckFailedException') {
      console.log(`Order ${order.id} already processed, skipping`);
      return; // Idempotent - don't throw
    }
    throw error;
  }
}

// FIFO message sending with grouping
async function sendOrderEvent(order: Order, sqsClient: SQSClient): Promise<void> {
  await sqsClient.send(new SendMessageCommand({
    QueueUrl: process.env.ORDER_QUEUE_URL,
    MessageBody: JSON.stringify({
      eventType: 'ORDER_CREATED',
      timestamp: new Date().toISOString(),
      data: order,
    }),
    MessageGroupId: order.customerId, // Group by customer for ordering
    MessageDeduplicationId: `${order.id}-${order.version}`,
    MessageAttributes: {
      eventType: {
        DataType: 'String',
        StringValue: 'ORDER_CREATED',
      },
      orderValue: {
        DataType: 'Number',
        StringValue: order.total.toString(),
      },
    },
  }));
}

Example 5: EventBridge Event Publishing

import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';

const eventBridge = new EventBridgeClient({});

interface DomainEvent {
  type: string;
  source: string;
  data: unknown;
  metadata?: Record<string, unknown>;
}

class EventPublisher {
  constructor(private readonly eventBusName: string) {}

  async publish(events: DomainEvent[]): Promise<void> {
    // Batch events (max 10 per request)
    const batches = this.chunk(events, 10);

    for (const batch of batches) {
      const entries = batch.map(event => ({
        EventBusName: this.eventBusName,
        Source: event.source,
        DetailType: event.type,
        Detail: JSON.stringify({
          ...event.data,
          metadata: {
            ...event.metadata,
            publishedAt: new Date().toISOString(),
            correlationId: event.metadata?.correlationId || crypto.randomUUID(),
          },
        }),
        Time: new Date(),
      }));

      const result = await eventBridge.send(new PutEventsCommand({
        Entries: entries,
      }));

      // Check for failures
      if (result.FailedEntryCount && result.FailedEntryCount > 0) {
        const failures = result.Entries?.filter(e => e.ErrorCode) || [];
        console.error('Failed to publish events:', failures);
        throw new Error(`Failed to publish ${result.FailedEntryCount} events`);
      }
    }
  }

  private chunk<T>(array: T[], size: number): T[][] {
    return Array.from({ length: Math.ceil(array.length / size) }, (_, i) =>
      array.slice(i * size, i * size + size)
    );
  }
}

// Usage in domain service
class OrderService {
  constructor(
    private readonly eventPublisher: EventPublisher,
    private readonly orderRepository: OrderRepository
  ) {}

  async createOrder(command: CreateOrderCommand): Promise<Order> {
    const order = Order.create(command);
    await this.orderRepository.save(order);

    // Publish domain event
    await this.eventPublisher.publish([{
      type: 'OrderCreated',
      source: 'com.company.orders',
      data: {
        orderId: order.id,
        customerId: order.customerId,
        items: order.items,
        total: order.total,
      },
      metadata: {
        correlationId: command.correlationId,
        userId: command.userId,
      },
    }]);

    return order;
  }
}

Anti-Patterns

❌ Not Using Dead Letter Queues

// WRONG - messages lost on failure
const queue = new sqs.Queue(this, 'Queue', {
  queueName: 'my-queue',
  // No DLQ configured
});

// ✅ CORRECT - always use DLQ
const queue = new sqs.Queue(this, 'Queue', {
  queueName: 'my-queue',
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 3,
  },
});

References