Search This Blog

Saturday 9 May 2020

Step Functions - AWS Service Integrations

With all this serverless chatter around Lambda, the next thing was to try and orchestrate these Lambdas with Step Functions. Essentially a Step Function allows you to define the steps to be executed across systems in a visual UI. The interactions, retries, failure conditions and other orchestration behavior (or the flow management) can be done by Step Functions.

Example: Consider the use case of building a cross team solution.
  1. Team A executes scheduled complex activity. 
  2. It then inserts result information in Dynamo Db table.
  3. Team A sends message to SQS queue owned by Team B (Next Stage processing)
  4. Team B returns a notification when it has finished the processing. 
  5. At this point the Step function waits for 5 seconds (for no reason).
  6. Team A triggers a workflow owned by Team C. 
  7. Team A then sends a message to an SNS.
  8. Team A updates the record in Dynamo DB as processed.
This is a pretty convoluted workflow designed solely for me to try stuff. Although it could be happening out there.
Team A could build an application that runs at cadence and then performs the whole sequence of operations. However in that case we would have to 
  1. Pay for 24/7 application uptime. If the activity is executed say once daily, this is wasted compute costs
  2. Manage the scaling. Say during peak, Team A executes 1000 times every alternate day. Our application would have to be scaled up on every alternate day to fast process the data and then scale down. Step Functions could automatically do that.
  3. Step functions allow for configuring retry, error handling, alternate flows etc. All of which is rendered visually. Imagine the complex code we write for all this being displayed as a graph.  The execution history can also be seen as a graph, sowing exactly what failedLife frustration reduced !!
So with this background, I setoff to create my workflow. The workflow we setup in Step Functions is called a state machine (similar to code package we execute in Lambda is called a function)
The State Machine is now responsible for all orchestrations that Team A was responsible for.
Step 1: Perform the complex activity
For this example we will pretend to do this using a Pass state.
Step 2: Insert Dynamo Db .
Step 3: Push details of File to SQS.
Step 4: Wait for 5 seconds
Step 5: Trigger the workflow
Step 6: Push to an SNS: 
Step 7: Update the inserted Dynamo Db entry
All of these will be done inside State Machine. These AWS Service Interactions can be done directly through Sep Functions (Details here).
My first step is to do the complex activity, for which I would create a Lambda or a AWS Batch job. To use it in step function, I setup a task step. From the AWS docs:

A Task state ("Type": "Task") represents a single unit of work 
performed by a state machine. 
All work in your state machine is done by tasks. A task performs 
work by using an activity or an AWS Lambda function, or by 
passing parameters to the API actions of other services.
For now I will simply use a Pass State instead.

So how does the Step function execute ? From the AWS Docs:

When this state machine is launched, the interpreter begins execution by identifying the 
Start State. It executes that state, and then checks to see if the state is marked as an 
End State. If it is, the machine terminates and returns a result. If the state is not an 
End State, the interpreter looks for a “Next” field to determine what state to run next; 
it repeats this process until it reaches a Terminal State (Succeed, Fail, or an End State)
 or a runtime error occurs.
The remaining tasks are AWS Integrations. To achieve them I need to first create the AWS resources:
  1. Setup a DynamoDb Table 
  2. Setup SQS for use with State Machine
  3. Setup SNS for use with State Machine
  4. Setup Cloud Watch event to trigger the state machine on a schedule.
My State Machine is as below:
For the complex task step, I am simply using a dummy step for now.
The Pass State (identified by "Type":"Pass") simply passes its input to its output, 
performing no work. Pass States are useful when constructing and debugging state machines.
The rest of the steps were AWS Service Integrations. Sharing the state machine xml here:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
{
  "Comment": "My Convoluted StepFunction",
  "StartAt": "ComplexTask",
  "States": {
    "ComplexTask": {
      "Type": "Pass",
      "Next": "InsertRecord"
    },
    "InsertRecord": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Parameters": {
        "TableName": "Message",
        "Item": {
          "messageId": {
            "S.$": "$.messageId"
          },
          "messageText": {
            "S.$": "$.messageText"
          },
          "status": {
            "S": "RECORD_ADDED"
          }
        }
      },
      "ResultPath": "$.DynamoDB",
      "Next": "InformOtherQueue",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    },
    "InformOtherQueue": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/466170491455/TeamCSqs",
        "MessageBody": {
          "Input.$":  "$.messageId"
        }
      },
      "ResultPath": "$.SQS",
      "Next": "WaitSometime",
      "TimeoutSeconds": 10,
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    },
    "WaitSometime": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "SendSNS"
    },
    "SendSNS": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message": {
          "Input.$":  "$.messageId"
        },
        "TopicArn": "arn:aws:sns:us-east-1:466170491455:TeamAsns"
      },
      "ResultPath": "$.SNS",
      "Next": "UpdateTableEntry",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    },
    "UpdateTableEntry": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:updateItem",
      "Parameters": {
        "TableName": "Message",
        "Key": {
          "messageId": {
            "S.$": "$.messageId"
          }
        },
        "UpdateExpression": "SET status = :myStatus",
        "ExpressionAttributeValues": {
          ":myStatus": {
            "S": "RECORD_PROCESSED"
          }
        }
      },
      "ResultPath": "$.TblUpdate",
      "End": true,
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    },
    "GracefulFail": {
      "Type": "Pass",
      "Result": "World",
      "End": true
    }
  }
}
Sample Input I used to test the step function:

{
  "messageId": "Message3",
  "messageText":"Hello",
  "timer_seconds":3
}
Diving into the state machine
Each state has a next state that acts as a node link in this directed graph.
The first activity was the dynamo db put item step.
It includes the table name and the dynamoDb JSON object. The saved record in table is
I made a dev error of not appending my dynamic attributes in State Machine with "$".

"Item": {
          "messageId": {
            "S.$": "$.messageId"
          },
          "messageText": {
            "S.$": "$.messageText"
          },
          "status": {
            "S": "RECORD_ADDED"
          }
        }
If these highlighted "$" suffixes are missed than the values are not evaluated by state. So the record created was
The next step is to push to an SQS
I tried creating a dynamic message using fields from input:
"MessageBody": {
          "Input.$":  "This is my $.messageId"
        }
However this is not supported. The State Machine save failed, saying the dynamic input must be from a path only. The task will not do any computations. It will only read the value from the input and push that message.  Remove the $ and you can push any hardcoded message you like.
In my previous step and this step I also included the ResultPath attribute. Not adding this is a problem:
The AWS Service Integrations are not Pass States, they generate some output and these output will be passed as input to next state. So without the ResultPath attribute, my input to SQS step was the output returned by the DynamoDb putItem task:
Add caption
As seen here the SQS step received the Dynamo Db response instead of my original input. The ResultPath attribute ensures that the response of DynamoDb is placed in a a field:

The next is the SNS insert step. Similar to SQS step it publishes to SNS and returns. The last step is the update Dynamo Db record step.
"UpdateTableEntry": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:updateItem",
      "Parameters": {
        "TableName": "Message",
        "Key": {
          "messageId": {
            "S.$": "$.messageId"
          }
        },
        "UpdateExpression": "SET messageStatus = :myStatus",
        "ExpressionAttributeValues": {
          ":myStatus": {
            "S": "RECORD_PROCESSED"
          }
        }
      },
      "ResultPath": "$.TblUpdate",
      "End": true,
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GracefulFail"
        }
      ]
    },
The update query is executed on the record. The final input object with all the step responses looks as below:
{
  "output": {
    "messageId": "Message3",
    "messageText": "Hello",
    "timer_seconds": 3,
    "DynamoDB": {
      "SdkHttpMetadata": {
        "HttpHeaders": {
          "Connection": "keep-alive",
          "Content-Length": "2",
          "Content-Type": "application/x-amz-json-1.0",
          "Date": "Fri, 08 May 2020 17:35:22 GMT",
          "Server": "Server",
          "x-amz-crc32": "2745614147",
          "x-amzn-RequestId": "42474c12-4dba-47a7-82a7-c745f8e8a59a"
        },
        "HttpStatusCode": 200
      },
      "SdkResponseMetadata": {
        "RequestId": "42474c12-4dba-47a7-82a7-c745f8e8a59a"
      }
    },
    "SQS": {
      "MD5OfMessageBody": "21ee0cdd2aec4288d4b9c0e48c1b9c83",
      "MessageId": "6e8c8c00-348a-463c-bef7-52b867743d6e",
      "SdkHttpMetadata": {
        "HttpHeaders": {
          "Content-Length": "378",
          "Content-Type": "text/xml",
          "Date": "Fri, 08 May 2020 17:35:22 GMT",
          "x-amzn-RequestId": "fc5f0f1d-47c8-4458-ba0d-8edc2a92d809"
        },
        "HttpStatusCode": 200
      },
      "SdkResponseMetadata": {
        "RequestId": "fc5f0f1d-47c8-4458-ba0d-8edc2a92d809"
      }
    },
    "SNS": {
      "MessageId": "c88b34fa-e831-502d-97b1-63d57294788e",
      "SdkHttpMetadata": {
        "HttpHeaders": {
          "Content-Length": "294",
          "Content-Type": "text/xml",
          "Date": "Fri, 08 May 2020 17:35:32 GMT",
          "x-amzn-RequestId": "0fe6b1a3-92ba-512b-a9f1-e68c081fb988"
        },
        "HttpStatusCode": 200
      },
      "SdkResponseMetadata": {
        "RequestId": "0fe6b1a3-92ba-512b-a9f1-e68c081fb988"
      }
    },
    "TblUpdate": {
      "SdkHttpMetadata": {
        "HttpHeaders": {
          "Connection": "keep-alive",
          "Content-Length": "2",
          "Content-Type": "application/x-amz-json-1.0",
          "Date": "Fri, 08 May 2020 17:35:33 GMT",
          "Server": "Server",
          "x-amz-crc32": "2745614147",
          "x-amzn-RequestId": "b018cc33-c681-403d-b0ca-cf37f50add58"
        },
        "HttpStatusCode": 200
      },
      "SdkResponseMetadata": {
        "RequestId": "b018cc33-c681-403d-b0ca-cf37f50add58"
      }
    }
  }
}
There are some more learnings, but I will continue that in an upcoming post

No comments:

Post a Comment