With all this serverless chatter around Lambda, the next thing was to try and orchestrate these Lambdas with Step Functions. Essentially a Step Function allows you to define the steps to be executed across systems in a visual UI. The interactions, retries, failure conditions and other orchestration behavior (or the flow management) can be done by Step Functions.
Example: Consider the use case of building a cross team solution.
Step 5: Trigger the workflow
Step 6: Push to an SNS:
Step 7: Update the inserted Dynamo Db entry
All of these will be done inside State Machine. These AWS Service Interactions can be done directly through Sep Functions (Details here).
For now I will simply use a Pass State instead.
So how does the Step function execute ? From the AWS Docs:
The remaining tasks are AWS Integrations. To achieve them I need to first create the AWS resources:
For the complex task step, I am simply using a dummy step for now.
The rest of the steps were AWS Service Integrations. Sharing the state machine xml here:
Sample Input I used to test the step function:
Diving into the state machine
Each state has a next state that acts as a node link in this directed graph.
The first activity was the dynamo db put item step.
It includes the table name and the dynamoDb JSON object. The saved record in table is
I made a dev error of not appending my dynamic attributes in State Machine with "$".
If these highlighted "$" suffixes are missed than the values are not evaluated by state. So the record created was
The next step is to push to an SQS
I tried creating a dynamic message using fields from input:
However this is not supported. The State Machine save failed, saying the dynamic input must be from a path only. The task will not do any computations. It will only read the value from the input and push that message. Remove the $ and you can push any hardcoded message you like.
In my previous step and this step I also included the ResultPath attribute. Not adding this is a problem:
The AWS Service Integrations are not Pass States, they generate some output and these output will be passed as input to next state. So without the ResultPath attribute, my input to SQS step was the output returned by the DynamoDb putItem task:
As seen here the SQS step received the Dynamo Db response instead of my original input. The ResultPath attribute ensures that the response of DynamoDb is placed in a a field:
The next is the SNS insert step. Similar to SQS step it publishes to SNS and returns. The last step is the update Dynamo Db record step.
The update query is executed on the record.
The final input object with all the step responses looks as below:
There are some more learnings, but I will continue that in an upcoming post
Example: Consider the use case of building a cross team solution.
- Team A executes scheduled complex activity.
- It then inserts result information in Dynamo Db table.
- Team A sends message to SQS queue owned by Team B (Next Stage processing)
- Team B returns a notification when it has finished the processing.
- At this point the Step function waits for 5 seconds (for no reason).
- Team A triggers a workflow owned by Team C.
- Team A then sends a message to an SNS.
- Team A updates the record in Dynamo DB as processed.
This is a pretty convoluted workflow designed solely for me to try stuff. Although it could be happening out there.
Team A could build an application that runs at cadence and then performs the whole sequence of operations. However in that case we would have to
Team A could build an application that runs at cadence and then performs the whole sequence of operations. However in that case we would have to
- Pay for 24/7 application uptime. If the activity is executed say once daily, this is wasted compute costs
- Manage the scaling. Say during peak, Team A executes 1000 times every alternate day. Our application would have to be scaled up on every alternate day to fast process the data and then scale down. Step Functions could automatically do that.
- Step functions allow for configuring retry, error handling, alternate flows etc. All of which is rendered visually. Imagine the complex code we write for all this being displayed as a graph. The execution history can also be seen as a graph, sowing exactly what failedLife frustration reduced !!
The State Machine is now responsible for all orchestrations that Team A was responsible for.
Step 1: Perform the complex activity
For this example we will pretend to do this using a Pass state.
Step 2: Insert Dynamo Db .
Step 3: Push details of File to SQS.
Step 4: Wait for 5 secondsStep 5: Trigger the workflow
Step 6: Push to an SNS:
Step 7: Update the inserted Dynamo Db entry
All of these will be done inside State Machine. These AWS Service Interactions can be done directly through Sep Functions (Details here).
My first step is to do the complex activity, for which I would create a Lambda or a AWS Batch job. To use it in step function, I setup a task step. From the AWS docs:
A Task state ("Type": "Task") represents a single unit of work performed by a state machine. All work in your state machine is done by tasks. A task performs work by using an activity or an AWS Lambda function, or by passing parameters to the API actions of other services.
So how does the Step function execute ? From the AWS Docs:
When this state machine is launched, the interpreter begins execution by identifying the Start State. It executes that state, and then checks to see if the state is marked as an End State. If it is, the machine terminates and returns a result. If the state is not an End State, the interpreter looks for a “Next” field to determine what state to run next; it repeats this process until it reaches a Terminal State (Succeed, Fail, or an End State) or a runtime error occurs.
- Setup a DynamoDb Table
- Setup SQS for use with State Machine
- Setup SNS for use with State Machine
- Setup Cloud Watch event to trigger the state machine on a schedule.
For the complex task step, I am simply using a dummy step for now.
The Pass State (identified by "Type":"Pass") simply passes its input to its output, performing no work. Pass States are useful when constructing and debugging state machines.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | { "Comment": "My Convoluted StepFunction", "StartAt": "ComplexTask", "States": { "ComplexTask": { "Type": "Pass", "Next": "InsertRecord" }, "InsertRecord": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:putItem", "Parameters": { "TableName": "Message", "Item": { "messageId": { "S.$": "$.messageId" }, "messageText": { "S.$": "$.messageText" }, "status": { "S": "RECORD_ADDED" } } }, "ResultPath": "$.DynamoDB", "Next": "InformOtherQueue", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] }, "InformOtherQueue": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage", "Parameters": { "QueueUrl": "https://sqs.us-east-1.amazonaws.com/466170491455/TeamCSqs", "MessageBody": { "Input.$": "$.messageId" } }, "ResultPath": "$.SQS", "Next": "WaitSometime", "TimeoutSeconds": 10, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] }, "WaitSometime": { "Type": "Wait", "Seconds": 10, "Next": "SendSNS" }, "SendSNS": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "Message": { "Input.$": "$.messageId" }, "TopicArn": "arn:aws:sns:us-east-1:466170491455:TeamAsns" }, "ResultPath": "$.SNS", "Next": "UpdateTableEntry", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] }, "UpdateTableEntry": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:updateItem", "Parameters": { "TableName": "Message", "Key": { "messageId": { "S.$": "$.messageId" } }, "UpdateExpression": "SET status = :myStatus", "ExpressionAttributeValues": { ":myStatus": { "S": "RECORD_PROCESSED" } } }, "ResultPath": "$.TblUpdate", "End": true, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] }, "GracefulFail": { "Type": "Pass", "Result": "World", "End": true } } } |
{ "messageId": "Message3", "messageText":"Hello", "timer_seconds":3 }
Each state has a next state that acts as a node link in this directed graph.
The first activity was the dynamo db put item step.
It includes the table name and the dynamoDb JSON object. The saved record in table is
I made a dev error of not appending my dynamic attributes in State Machine with "$".
"Item": { "messageId": { "S.$": "$.messageId" }, "messageText": { "S.$": "$.messageText" }, "status": { "S": "RECORD_ADDED" } }
The next step is to push to an SQS
I tried creating a dynamic message using fields from input:
"MessageBody": { "Input.$": "This is my $.messageId" }
In my previous step and this step I also included the ResultPath attribute. Not adding this is a problem:
The AWS Service Integrations are not Pass States, they generate some output and these output will be passed as input to next state. So without the ResultPath attribute, my input to SQS step was the output returned by the DynamoDb putItem task:
Add caption |
The next is the SNS insert step. Similar to SQS step it publishes to SNS and returns. The last step is the update Dynamo Db record step.
"UpdateTableEntry": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:updateItem", "Parameters": { "TableName": "Message", "Key": { "messageId": { "S.$": "$.messageId" } }, "UpdateExpression": "SET messageStatus = :myStatus", "ExpressionAttributeValues": { ":myStatus": { "S": "RECORD_PROCESSED" } } }, "ResultPath": "$.TblUpdate", "End": true, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "GracefulFail" } ] },
{ "output": { "messageId": "Message3", "messageText": "Hello", "timer_seconds": 3, "DynamoDB": { "SdkHttpMetadata": { "HttpHeaders": { "Connection": "keep-alive", "Content-Length": "2", "Content-Type": "application/x-amz-json-1.0", "Date": "Fri, 08 May 2020 17:35:22 GMT", "Server": "Server", "x-amz-crc32": "2745614147", "x-amzn-RequestId": "42474c12-4dba-47a7-82a7-c745f8e8a59a" }, "HttpStatusCode": 200 }, "SdkResponseMetadata": { "RequestId": "42474c12-4dba-47a7-82a7-c745f8e8a59a" } }, "SQS": { "MD5OfMessageBody": "21ee0cdd2aec4288d4b9c0e48c1b9c83", "MessageId": "6e8c8c00-348a-463c-bef7-52b867743d6e", "SdkHttpMetadata": { "HttpHeaders": { "Content-Length": "378", "Content-Type": "text/xml", "Date": "Fri, 08 May 2020 17:35:22 GMT", "x-amzn-RequestId": "fc5f0f1d-47c8-4458-ba0d-8edc2a92d809" }, "HttpStatusCode": 200 }, "SdkResponseMetadata": { "RequestId": "fc5f0f1d-47c8-4458-ba0d-8edc2a92d809" } }, "SNS": { "MessageId": "c88b34fa-e831-502d-97b1-63d57294788e", "SdkHttpMetadata": { "HttpHeaders": { "Content-Length": "294", "Content-Type": "text/xml", "Date": "Fri, 08 May 2020 17:35:32 GMT", "x-amzn-RequestId": "0fe6b1a3-92ba-512b-a9f1-e68c081fb988" }, "HttpStatusCode": 200 }, "SdkResponseMetadata": { "RequestId": "0fe6b1a3-92ba-512b-a9f1-e68c081fb988" } }, "TblUpdate": { "SdkHttpMetadata": { "HttpHeaders": { "Connection": "keep-alive", "Content-Length": "2", "Content-Type": "application/x-amz-json-1.0", "Date": "Fri, 08 May 2020 17:35:33 GMT", "Server": "Server", "x-amz-crc32": "2745614147", "x-amzn-RequestId": "b018cc33-c681-403d-b0ca-cf37f50add58" }, "HttpStatusCode": 200 }, "SdkResponseMetadata": { "RequestId": "b018cc33-c681-403d-b0ca-cf37f50add58" } } } }
No comments:
Post a Comment