Search This Blog

Saturday, 9 May 2020

Step Functions - AWS Service Integrations 2

In the previous post we successfully executed dynamo db, sqs and sns interactions directly from step functions. I wanted to try some tweaks to my state machine.

Consider the SQS call from state machine
"InformOtherQueue": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/466170491455/TeamCSqs",
        "MessageBody": {
          "Input.$":  "$.messageId"
        }
      },
      "ResultPath": "$.SQS",
      "Next": "WaitSometime",
      "TimeoutSeconds": 10,
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    }
I changed my resource to "arn:aws:states:::sqs:sendMessage.sync" and reran the state machine. I expected no difference but it failed:
{
  "error": "States.Runtime",
  "cause": "An error occurred while scheduling the state 'InformOtherQueue'.
 The provided ARN 'arn:aws:states:::sqs:sendMessage.sync' is not recognized."
}
Long story short, "sync" attribute is only supported by specific resources - AWS Batch and ECS. Which makes sense if we read the explanation:
For integrated services such as AWS Batch and Amazon ECS, Step Functions can wait
for a request to complete before progressing to the next state. To have Step 
Functions wait, specify the "Resource" field in your task state definition with 
the .sync suffix appended after the resource URI.
In case of SQS, the state machine gets a success response and then moves forward. There is no asynchronous behavior involved. But AWS batch jobs could be long running and waiting on their completion might not always be desired. So when we need the wait for compete behavior we can use the "sync" setting.

The other  capability that AWS Integrations exposes is callbacks. Step Functions can pause workflow waiting for some information to be returned from a resource. For example in this case the state machine could wait after sending SQS message for a reply from dependent team.
I changed my SQS step to be as below
"InformOtherQueue": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/466170491455/TeamCSqs",
        "MessageBody": {
          "Input.$":  "$.messageId",
          "TaskToken.$": "$$.Task.Token"
        }
      },
      "ResultPath": "$.SQS",
      "Next": "WaitSometime",
      "TimeoutSeconds": 10,
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    }
In this case the input generated in the SQS State is as below:

The task token is now a part of the SQS Message. The state machine will wait for 10 seconds and then timeout if it does not receive a callback from the Queue consumer 
When it's complete, the external service calls SendTaskSuccess or SendTaskFailure
with the taskToken included. Only then does the workflow continue to the next state.
I will setup a Lambda here that can consume from the SQS and return the callback information needed here.
public class NotificationHandler implements RequestHandler<SQSEvent, Void> {

    private ObjectMapper mapper = new ObjectMapper();
    private AWSStepFunctions awsStepFunctionsClient;

    public NotificationHandler() {
        AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey",
                "SecretKey");
        AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);

        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setClientExecutionTimeout(2 * 60 * 1000);//2 mins
        awsStepFunctionsClient = AWSStepFunctionsClient.builder()
                .withCredentials(awsCredentialsProvider)
                .withRegion(Regions.US_EAST_1)
                .withClientConfiguration(clientConfiguration)
                .build();
    }

    public Void handleRequest(SQSEvent request, Context context) {
        LambdaLogger logger = context.getLogger();
        logger.log("In Handler: Executing " + context.getFunctionName() + ", " + context.getFunctionVersion());
        logger.log(request.toString());


        for (SQSEvent.SQSMessage snsRecord : request.getRecords()) {
            String message = snsRecord.getBody();

            Map<String, String> map = getMap(message, logger);
            if (map.containsKey("TaskToken")) {
                //Respond back to Step Fn that work is completed
                SendTaskSuccessResult successResult = callbackStepFunction(map.get("TaskToken"), map.get("Input"));
                logger.log(successResult.toString());
            }

            System.out.println(message);
        }
        return null;
    }

    private Map getMap(String message, LambdaLogger logger) {
        try {
            return mapper.readValue(message, Map.class);
        } catch (JsonProcessingException e) {
            logger.log(e.getMessage() + " while processing " + message);
        }
        return new HashMap<>();
    }

    private SendTaskSuccessResult callbackStepFunction(String taskToken, String message) {
        SendTaskSuccessRequest sendTaskSuccessRequest = new SendTaskSuccessRequest();
        sendTaskSuccessRequest.setTaskToken(taskToken);
        sendTaskSuccessRequest.setOutput("{\"Output\": \"message\",\"ack\":\"success\"}");
        return awsStepFunctionsClient.sendTaskSuccess(sendTaskSuccessRequest);

    }
}
The Lambda reads the message, if it detects a task token it responds back with TaskSuccess Message. The Step Function executed successfully. The states results can be seen here.

1 comment:

  1. Very valuable information! Redspider web design abu dhabi  is about to tell you that I was looking for something like that and found it here, Congratulations!

    ReplyDelete