In the previous post we successfully executed dynamo db, sqs and sns interactions directly from step functions. I wanted to try some tweaks to my state machine.
Consider the SQS call from state machine
I changed my resource to "arn:aws:states:::sqs:sendMessage.sync" and reran the state machine. I expected no difference but it failed:
Long story short, "sync" attribute is only supported by specific resources - AWS Batch and ECS. Which makes sense if we read the explanation:
In case of SQS, the state machine gets a success response and then moves forward. There is no asynchronous behavior involved. But AWS batch jobs could be long running and waiting on their completion might not always be desired. So when we need the wait for compete behavior we can use the "sync" setting.
The other capability that AWS Integrations exposes is callbacks. Step Functions can pause workflow waiting for some information to be returned from a resource. For example in this case the state machine could wait after sending SQS message for a reply from dependent team.
I changed my SQS step to be as below
In this case the input generated in the SQS State is as below:
The task token is now a part of the SQS Message. The state machine will wait for 10 seconds and then timeout if it does not receive a callback from the Queue consumer
I will setup a Lambda here that can consume from the SQS and return the callback information needed here.
The Lambda reads the message, if it detects a task token it responds back with TaskSuccess Message.
The Step Function executed successfully. The states results can be seen here.
Consider the SQS call from state machine
"InformOtherQueue": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage", "Parameters": { "QueueUrl": "https://sqs.us-east-1.amazonaws.com/466170491455/TeamCSqs", "MessageBody": { "Input.$": "$.messageId" } }, "ResultPath": "$.SQS", "Next": "WaitSometime", "TimeoutSeconds": 10, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] }
{ "error": "States.Runtime", "cause": "An error occurred while scheduling the state 'InformOtherQueue'.
The provided ARN 'arn:aws:states:::sqs:sendMessage.sync' is not recognized."
}
For integrated services such as AWS Batch and Amazon ECS, Step Functions can wait
for a request to complete before progressing to the next state. To have Step
Functions wait, specify the "Resource" field in your task state definition with
the .sync suffix appended after the resource URI.
The other capability that AWS Integrations exposes is callbacks. Step Functions can pause workflow waiting for some information to be returned from a resource. For example in this case the state machine could wait after sending SQS message for a reply from dependent team.
I changed my SQS step to be as below
"InformOtherQueue": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { "QueueUrl": "https://sqs.us-east-1.amazonaws.com/466170491455/TeamCSqs", "MessageBody": { "Input.$": "$.messageId", "TaskToken.$": "$$.Task.Token" } }, "ResultPath": "$.SQS", "Next": "WaitSometime", "TimeoutSeconds": 10, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] }
The task token is now a part of the SQS Message. The state machine will wait for 10 seconds and then timeout if it does not receive a callback from the Queue consumer
When it's complete, the external service calls SendTaskSuccess or SendTaskFailure
with the taskToken included. Only then does the workflow continue to the next state.
public class NotificationHandler implements RequestHandler<SQSEvent, Void> { private ObjectMapper mapper = new ObjectMapper(); private AWSStepFunctions awsStepFunctionsClient; public NotificationHandler() { AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey", "SecretKey"); AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setClientExecutionTimeout(2 * 60 * 1000);//2 mins awsStepFunctionsClient = AWSStepFunctionsClient.builder() .withCredentials(awsCredentialsProvider) .withRegion(Regions.US_EAST_1) .withClientConfiguration(clientConfiguration) .build(); } public Void handleRequest(SQSEvent request, Context context) { LambdaLogger logger = context.getLogger(); logger.log("In Handler: Executing " + context.getFunctionName() + ", " + context.getFunctionVersion()); logger.log(request.toString()); for (SQSEvent.SQSMessage snsRecord : request.getRecords()) { String message = snsRecord.getBody(); Map<String, String> map = getMap(message, logger); if (map.containsKey("TaskToken")) { //Respond back to Step Fn that work is completed SendTaskSuccessResult successResult = callbackStepFunction(map.get("TaskToken"), map.get("Input")); logger.log(successResult.toString()); } System.out.println(message); } return null; } private Map getMap(String message, LambdaLogger logger) { try { return mapper.readValue(message, Map.class); } catch (JsonProcessingException e) { logger.log(e.getMessage() + " while processing " + message); } return new HashMap<>(); } private SendTaskSuccessResult callbackStepFunction(String taskToken, String message) { SendTaskSuccessRequest sendTaskSuccessRequest = new SendTaskSuccessRequest(); sendTaskSuccessRequest.setTaskToken(taskToken); sendTaskSuccessRequest.setOutput("{\"Output\": \"message\",\"ack\":\"success\"}"); return awsStepFunctionsClient.sendTaskSuccess(sendTaskSuccessRequest); } }
Very valuable information! Redspider - web design abu dhabi is about to tell you that I was looking for something like that and found it here, Congratulations!
ReplyDelete