It is a standard decoupling pattern to have publishers send their messages to an SNS owned by them. Consumers will setup SQS endpoints that subscribe to SNS and get the messages. However consider that there are many consumers all of whom are interested in different subset of messages sent by the SNS.
Consider I have an SNS that publishes information about items available for sale. The SNS publishes information about consumables, furniture, outdoor equipment. I have three consumers who are interested in one of these items. Rather then have their SQS filled with all kinds of items, we can configure SQS to only receive the items that we are interested in.Step 1: Setup an SNS in AWS
Ill use this opportunity to look deep into the options available for setup
The topic has a name and an optional display name. All message published to the topic are then stored there, before SNS attempts to deliver them to the recipients.
Amazon SNS provides durable storage of all messages that it receives. When Amazon SNS receives your Publish request, it stores multiple copies of your message to disk. Before Amazon SNS confirms to you that it received your request, it stores the message in multiple isolated locations known as Availability Zones. The message is stored in Availability Zones that are located within your chosen AWS Region, such as the US East (N. Virginia) Region. Although rare, should a failure occur in one Availability Zone, Amazon SNS remains operational, and the durability of your messages persists.
Access Policy allows us to define who can publish and subscribe to these topics. The topic owner is the AWS Account where we create this SNS Topic.
'Everyone' is an interesting setting - like making it public consumable. We can also restrict the access to specific AWS accounts.
Last is logging. In this case we have enabled cloud watch logs for SQS publishing. (This means we are paying for CloudWatch)
I had to create an IAM role to allow SNS access to CloudWatch
This completes the creation of SNS.
Next Step: A Lambda that publishes items to SNS every minute.
public class SellerLambda implements RequestHandler<ScheduledEvent, Void> { private String topicArn = "arn:aws:sns:us-east-1:---------:ItemsToSellSNS"; private AmazonSNS snsClient; public SellerLambda() { AmazonSNSClientBuilder snsClientBuilder = AmazonSNSClientBuilder.standard(); snsClientBuilder.setClientConfiguration(new ClientConfiguration()); snsClientBuilder.setCredentials(new AWSStaticCredentialsProvider( new BasicAWSCredentials("AccessKey", "SecretKey"))); snsClientBuilder.setRegion(Regions.US_EAST_1.getName()); snsClient = snsClientBuilder.build(); } @Override public Void handleRequest(ScheduledEvent input, Context context) { LambdaLogger logger = context.getLogger(); logger.log("In Handler: Executing " + context.getFunctionName() + ", " + context.getFunctionVersion() + ", event triggered at " + input.getTime()); logger.log(input.toString()); Set<String> items = new HashSet<>(); items.add("Furniture"); items.add("Outdoors"); items.add("Food"); for (String item : items) { final PublishRequest publishRequest = new PublishRequest(topicArn, item + Math.random()); MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType("String"); messageAttributeValue.setStringValue(item); publishRequest.addMessageAttributesEntry("ITEM_TYPE", messageAttributeValue);
PublishResult publishResult = snsClient.publish(publishRequest); logger.log("Result of publishing message for item " + item + " is " + publishResult); } return null; } }
It exposes a batch message to publish SNS messages - but in that case the input is plain String messages.
The method we used here is one that publishes a message at a time. Each message includes message attributes. The publish method returns a response - that can be used to verify successful publish.
I setup a simple Cloud Watch Rule that triggers the lambda every minute.
The Cloud Watch logs of one successful run is as below:
START RequestId: 14c925ed-c066-4d9d-ba91-36fc466d21a6 Version: $LATEST In Handler: Executing SellerNotificationsFunction, $LATEST, event triggered at 2020-08-08T18:47:14.000Z {account: XXXXXXXXXXX,region: us-east-1,detail: {},detailType: Scheduled Event,source: aws.events,id: 6a450b80-fa20-04cc-4f68-f3f793330be7,time: 2020-08-08T18:47:14.000Z,resources: [arn:aws:events:us-east-1:XXXXXXXXXXX:rule/SellerNotificationScheduledTrigger]} Result of publishing message for item Outdoors is {MessageId: 1944ddbe-94ac-566e-9a47-8a166174abbf} Result of publishing message for item Furniture is {MessageId: bcab8e23-0329-5b49-bef2-d98bf6f09698} Result of publishing message for item Food is {MessageId: 23b23ebe-ab06-5f59-a3eb-69165edf9288} END RequestId: 14c925ed-c066-4d9d-ba91-36fc466d21a6 REPORT RequestId: 14c925ed-c066-4d9d-ba91-36fc466d21a6 Duration: 3592.67 ms Billed Duration: 3600 ms Memory Size: 512 MB Max Memory Used: 135 MB Init Duration: 2082.80 ms
I setup an SQS queue without any filtering - this subscribes to all messages from the SNS. However my Cloudwatch logs indicate that there was a delivery failure:
{ "notification": { "messageMD5Sum": "52d7dff0bcb7e0301b55896a038a417e", "messageId": "be9a9c5a-6d07-52a8-bff5-cbf36d9f8360", "topicArn": "arn:aws:sns:us-east-1:XXXXXX:ItemsToSellSNS", "timestamp": "2020-08-08 19:34:48.021" }, "delivery": { "deliveryId": "c85f2b94-3d32-51ce-9945-7c3d3daa6426", "destination": "arn:aws:sqs:us-east-1:XXXXXX:FurnitureQueue", "providerResponse": "{\"ErrorCode\":\"AccessDenied\",\"ErrorMessage\":\"Access to the resource https://sqs.us-east-1.amazonaws.com/XXXXXX/FurnitureQueue is denied.\",\"sqsRequestId\":\"Unrecoverable\"}", "dwellTimeMs": 34, "attempts": 1, "statusCode": 403 }, "status": "FAILURE" }
The next step is to add a attribute filter to the subscription
The subscription policy here means a message will be sent to the SQS Queue only if the message has an attribute "ITEM_TYPE" with value "Furniture".
I verified this from the SNS logs:
At this point I have two queue
- AllItemsQueue - The queue receives all messages from ItemsToSellSNS Topic
- FurnitureQueue - The queue receives only message matching the filter policy from ItemsToSellSNS Topic
-
From Lambda logs:
I checked the SNS delivery logs and it indicates the Furniture message was sent to two queues and the Outdoors message was sent to a single queue
Result of publishing message for item Furniture is {MessageId: 06810711-2867-5e8b-a528-50b820abc0b4} Result of publishing message for item Outdoors is {MessageId: c5f1d357-491a-5e0b-81ae-84fe41d68034}
{ "notification": { "messageMD5Sum": "314f159e953ed9c955d3c65504f679c6", "messageId": "06810711-2867-5e8b-a528-50b820abc0b4", "topicArn": "arn:aws:sns:us-east-1:686673491323:ItemsToSellSNS", "timestamp": "2020-08-08 20:09:48.365" }, "delivery": { "deliveryId": "e5551c8f-511f-5092-919d-4b2659d39cac", "destination": "arn:aws:sqs:us-east-1:686673491323:FurnitureQueue", "providerResponse": "{\"sqsRequestId\":\"81390137-859b-5f33-9596-569e8d573181\",\"sqsMessageId\":\"1cb34c87-fe5e-4c04-a162-88a88d0a0615\"}", "dwellTimeMs": 39, "attempts": 1, "statusCode": 200 }, "status": "SUCCESS" } { "notification": { "messageMD5Sum": "376323bb15b81d0b29736a68d46a6b84", "messageId": "c5f1d357-491a-5e0b-81ae-84fe41d68034", "topicArn": "arn:aws:sns:us-east-1:686673491323:ItemsToSellSNS", "timestamp": "2020-08-08 20:09:48.344" }, "delivery": { "deliveryId": "e4dcdc25-c3c0-5d00-96bf-8b8ccb0c1f97", "destination": "arn:aws:sqs:us-east-1:686673491323:AllItemsQueue", "providerResponse": "{\"sqsRequestId\":\"8a1a41c1-8d06-5f9a-ad6b-166885caeb05\",\"sqsMessageId\":\"a19ec52f-32c8-4797-87e2-400bd5664b5d\"}", "dwellTimeMs": 52, "attempts": 1, "statusCode": 200 }, "status": "SUCCESS" }
It’s absolutely awesome article. Redspider - classified script is about to tell you that it helped me a lot to understand more. Thank you!
ReplyDelete