Search This Blog

Sunday 9 August 2020

An SNS primer - also Auto Routing SNS messages to different SQS

 It is a standard decoupling pattern to have publishers send their messages to an SNS owned by them. Consumers will setup SQS endpoints that subscribe to SNS and get the messages. However consider that there are many consumers all of whom are interested in different subset of messages sent by the SNS.

Consider I have an SNS that publishes information about items available for sale. The SNS publishes information about consumables, furniture, outdoor equipment. I have three consumers who are interested in one of these items. Rather then have their SQS filled with all kinds of items, we can configure SQS to only receive the items that we are interested in.
Step 1: Setup an SNS in AWS
Ill use this opportunity to look deep into the options available for setup
The topic has a name and an optional display name. All message published to the topic are then stored there, before SNS attempts to deliver them to the recipients.
Amazon SNS provides durable storage of all messages that it receives. When Amazon SNS
receives your Publish request, it stores multiple copies of your message to disk. 
Before Amazon SNS confirms to you that it received your request, it stores the message
in multiple isolated locations known as Availability Zones. The message is stored in
Availability Zones that are located within your chosen AWS Region, such as the US East
(N. Virginia) Region. Although rare, should a failure occur in one Availability Zone,
Amazon SNS remains operational, and the durability of your messages persists.
Access Policy allows us to define who can publish and subscribe to these topics. The topic owner is the AWS Account where we create this SNS Topic.
'Everyone' is an interesting setting - like making it public consumable. We can also restrict the access to specific AWS accounts. 
The next options are related to retry behavior when publishing to SNS
Last is logging. In this case we have enabled cloud watch logs for SQS publishing. (This means we are paying for CloudWatch)
I had to create an IAM role to allow SNS access to CloudWatch

This completes the creation of SNS.

Next Step: A Lambda that publishes items to SNS every minute.
public class SellerLambda implements RequestHandler<ScheduledEvent, Void> {

    private String topicArn = "arn:aws:sns:us-east-1:---------:ItemsToSellSNS";
    private AmazonSNS snsClient;

    public SellerLambda() {
        AmazonSNSClientBuilder snsClientBuilder = AmazonSNSClientBuilder.standard();
        snsClientBuilder.setClientConfiguration(new ClientConfiguration());
        snsClientBuilder.setCredentials(new AWSStaticCredentialsProvider(
                new BasicAWSCredentials("AccessKey", "SecretKey")));
        snsClientBuilder.setRegion(Regions.US_EAST_1.getName());
        snsClient = snsClientBuilder.build();
    }

    @Override
    public Void handleRequest(ScheduledEvent input, Context context) {
        LambdaLogger logger = context.getLogger();
        logger.log("In Handler: Executing " + context.getFunctionName() + ", " + context.getFunctionVersion()
                + ", event triggered at " + input.getTime());
        logger.log(input.toString());

        Set<String> items = new HashSet<>();
        items.add("Furniture");
        items.add("Outdoors");
        items.add("Food");
        for (String item : items) {
            final PublishRequest publishRequest = new PublishRequest(topicArn, item + Math.random());
            MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
            messageAttributeValue.setDataType("String");
            messageAttributeValue.setStringValue(item);
            publishRequest.addMessageAttributesEntry("ITEM_TYPE", messageAttributeValue);
            
            PublishResult publishResult = snsClient.publish(publishRequest);
            logger.log("Result of publishing message for item " + item + " is " + publishResult);
        }
        return null;
    }
}
The SNS Client is pretty straight forward - It can work with all topics under the account.
It exposes a batch message to publish SNS messages - but in that case the input is plain String messages.
The method we used here is one that publishes a message at a time. Each message includes message attributes. The publish method returns a response - that can be used to verify successful publish.
I setup a simple Cloud Watch Rule that triggers the lambda every minute.
The Cloud Watch logs of one successful run is as below:

START RequestId: 14c925ed-c066-4d9d-ba91-36fc466d21a6 Version: $LATEST
In Handler: Executing SellerNotificationsFunction, $LATEST, event triggered at 2020-08-08T18:47:14.000Z
{account: XXXXXXXXXXX,region: us-east-1,detail: {},detailType: Scheduled Event,source: aws.events,id: 6a450b80-fa20-04cc-4f68-f3f793330be7,time: 2020-08-08T18:47:14.000Z,resources: [arn:aws:events:us-east-1:XXXXXXXXXXX:rule/SellerNotificationScheduledTrigger]}
Result of publishing message for item Outdoors is {MessageId: 1944ddbe-94ac-566e-9a47-8a166174abbf}
Result of publishing message for item Furniture is {MessageId: bcab8e23-0329-5b49-bef2-d98bf6f09698}
Result of publishing message for item Food is {MessageId: 23b23ebe-ab06-5f59-a3eb-69165edf9288}
END RequestId: 14c925ed-c066-4d9d-ba91-36fc466d21a6
REPORT RequestId: 14c925ed-c066-4d9d-ba91-36fc466d21a6	Duration: 3592.67 ms	Billed Duration: 3600 ms	Memory Size: 512 MB	Max Memory Used: 135 MB	Init Duration: 2082.80 ms	
Next step is to setup SQS queues and configure them to recieve messages from SNS topic.
I setup an SQS queue without any filtering - this subscribes to all messages from the SNS. However my Cloudwatch logs indicate that there was a delivery failure:
{
    "notification": {
        "messageMD5Sum": "52d7dff0bcb7e0301b55896a038a417e",
        "messageId": "be9a9c5a-6d07-52a8-bff5-cbf36d9f8360",
        "topicArn": "arn:aws:sns:us-east-1:XXXXXX:ItemsToSellSNS",
        "timestamp": "2020-08-08 19:34:48.021"
    },
    "delivery": {
        "deliveryId": "c85f2b94-3d32-51ce-9945-7c3d3daa6426",
        "destination": "arn:aws:sqs:us-east-1:XXXXXX:FurnitureQueue",
        "providerResponse": "{\"ErrorCode\":\"AccessDenied\",\"ErrorMessage\":\"Access to the resource https://sqs.us-east-1.amazonaws.com/XXXXXX/FurnitureQueue is denied.\",\"sqsRequestId\":\"Unrecoverable\"}",
        "dwellTimeMs": 34,
        "attempts": 1,
        "statusCode": 403
    },
    "status": "FAILURE"
}
This indicates there was a permission issue. I updated the SQS permission to allow everyone to publish to the SQS. (This enabled SNS to publish messages successfully to SQS)

The next step is to add a attribute filter to the subscription
The subscription policy here means a message will be sent to the SQS Queue only if the message has an attribute "ITEM_TYPE" with value "Furniture".
I verified this from the SNS logs:
At this point I have two queue 
  • AllItemsQueue - The queue receives all messages from ItemsToSellSNS Topic
  • FurnitureQueue - The queue receives only message matching the filter policy from ItemsToSellSNS Topic
- From Lambda logs:
Result of publishing message for item Furniture is 
{MessageId: 06810711-2867-5e8b-a528-50b820abc0b4}
Result of publishing message for item Outdoors is 
{MessageId: c5f1d357-491a-5e0b-81ae-84fe41d68034}
I checked the SNS delivery logs and it indicates the Furniture message was sent to two queues and the Outdoors message was sent to a single queue
{
    "notification": {
        "messageMD5Sum": "314f159e953ed9c955d3c65504f679c6",
        "messageId": "06810711-2867-5e8b-a528-50b820abc0b4",
        "topicArn": "arn:aws:sns:us-east-1:686673491323:ItemsToSellSNS",
        "timestamp": "2020-08-08 20:09:48.365"
    },
    "delivery": {
        "deliveryId": "e5551c8f-511f-5092-919d-4b2659d39cac",
        "destination": "arn:aws:sqs:us-east-1:686673491323:FurnitureQueue",
        "providerResponse": "{\"sqsRequestId\":\"81390137-859b-5f33-9596-569e8d573181\",\"sqsMessageId\":\"1cb34c87-fe5e-4c04-a162-88a88d0a0615\"}",
        "dwellTimeMs": 39,
        "attempts": 1,
        "statusCode": 200
    },
    "status": "SUCCESS"
}
{
    "notification": {
        "messageMD5Sum": "376323bb15b81d0b29736a68d46a6b84",
        "messageId": "c5f1d357-491a-5e0b-81ae-84fe41d68034",
        "topicArn": "arn:aws:sns:us-east-1:686673491323:ItemsToSellSNS",
        "timestamp": "2020-08-08 20:09:48.344"
    },
    "delivery": {
        "deliveryId": "e4dcdc25-c3c0-5d00-96bf-8b8ccb0c1f97",
        "destination": "arn:aws:sqs:us-east-1:686673491323:AllItemsQueue",
        "providerResponse": "{\"sqsRequestId\":\"8a1a41c1-8d06-5f9a-ad6b-166885caeb05\",\"sqsMessageId\":\"a19ec52f-32c8-4797-87e2-400bd5664b5d\"}",
        "dwellTimeMs": 52,
        "attempts": 1,
        "statusCode": 200
    },
    "status": "SUCCESS"
}
The first one as seen went to both queues while the second message was only delivered to AllItemsQueue

1 comment:

  1. It’s absolutely awesome article. Redspider classified script  is about to tell you that it helped me a lot to understand more. Thank you!

    ReplyDelete