In the previous post we setup a basic Lambda function that wrote to Redis. I updated the function to log some Cloud Watch metrics.
This code is trying to do a very simplified implementation of the Token Bucket Algorithm.
The Lambda runs every minute. Within the code, it attempts to update the bucket every second by adding 60 tokens in every cache update. The bucket size is capped at 100 tokens. The Lambda also logs the number of tokens it detected as available within the bucket.
I am using Cloud Watch events to invoke my Lambda. As Cloud Watch events can be scheduled at maximum frequency of once per minute, I decided to run a 1 minute loop within my function. Effectively achieving a 1 second update for my Token Bucket
On running this code kept failing with a timeout exception.
A little read-up revealed that my Lambda was unable to contact to AWS Cloud Watch endpoints.
The Elasti Cache instance exists within a VPC. In order for Lambda function to be able to access the Cache, we provided Lambda with VPC, subnets and security group details. However that means Lambda function can no longer connect to Cloud Watch API. To resolve this we need to use VPC Endpoints.
CloudWatch falls in this category.
I created Interface VPC Endpoint to talk with AWS Cloudwatch as suggested here.
The subnets and security group are same as my VPC. The policy is edited to only support CloudWatch.
Rerunning the Lambda worked.
I set up a cloud watch event to invoke my Lambda every minute.
Similarly I created a Lambda function that consumes tokens from the bucket
The Lambda can receive three different inputs (BLUE/RED/YELLOW) based on which it tries to acquire certain tokens every second.
The Lambda again executes in a loop 60 times. To run the Lambda for three different inputs I setup another Cloud Watch Rule
Now to watch metrics as the functions execute every minute.
As seen here, all though the consumers would like to execute at higher rates. But they can only work with TPS available.
public class TokenBucketRefresherLambdaFn implements RequestHandler<String, Void> { private final Jedis jedis; private final AmazonCloudWatchAsync cloudWatchClient; public TokenBucketRefresherLambdaFn() { jedis = new Jedis(new HostAndPort(
"redis-test-cache-001.vmaptv.0001.use1.cache.amazonaws.com", 6379)); AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey", "SecretKey"); AWSCredentialsProvider awsCredentialsProvider = new
AWSStaticCredentialsProvider(awsCredentials); cloudWatchClient = AmazonCloudWatchAsyncClientBuilder.standard()
.withCredentials(awsCredentialsProvider) .withClientConfiguration(new ClientConfiguration()).build(); } @Override public Void handleRequest(String input, Context context) { LambdaLogger logger = context.getLogger(); logger.log("In Handler: Executing " + context.getFunctionName() + ", "
+ context.getFunctionVersion()); logger.log("Input received " + input); //This input is not really used now int totalRuns = 0; long startTime = System.currentTimeMillis(); List<Integer> availCapacities = new ArrayList<>(60); while (totalRuns < 60 && (System.currentTimeMillis() - startTime < 1000 * 60)) { long timestamp = System.currentTimeMillis(); int availCapacity = updateTokenCapacity(logger); availCapacities.add(availCapacity); asyncLogMetricsToCloudWatch(availCapacity); long duration = System.currentTimeMillis() - timestamp; if (duration < 1000) { //Operation took less than a second, let Thread sleep before running again try { Thread.sleep(1000 - duration); } catch (InterruptedException e) { logger.log("Sleep failed " + e.getMessage()); } } totalRuns++; } logger.log("Total Runs " + totalRuns + " and available capacities detected " + availCapacities); return null; } private int updateTokenCapacity(LambdaLogger logger) { String token = "TokenCount"; int capacity = 60; //adds 60 tokens at every run. Bucket capacity is 100 String value = jedis.get(token); int crtBucketValue = 0; if (value == null) { logger.log("No value detected for Key, adding"); } else { crtBucketValue = Integer.parseInt(value); } int newBucketCapacity = Math.min(100, crtBucketValue + 60); jedis.set(token, String.valueOf(newBucketCapacity)); return value == null ? 0 : Integer.valueOf(value);
//capacity available for use } private void asyncLogMetricsToCloudWatch(int unusedCapacity) { Collection<MetricDatum> metricDatums = new ArrayList<>(1); metricDatums.add(new MetricDatum() .withMetricName("capacityAvailable") .withValue(Double.valueOf(unusedCapacity)) .withUnit(StandardUnit.None)); PutMetricDataRequest putMetricDataRequest = new PutMetricDataRequest(); putMetricDataRequest.setMetricData(metricDatums); putMetricDataRequest.setNamespace("TokenBucket"); //Asynchronous push - returns a future with result cloudWatchClient.putMetricDataAsync(putMetricDataRequest); } }
The Lambda runs every minute. Within the code, it attempts to update the bucket every second by adding 60 tokens in every cache update. The bucket size is capped at 100 tokens. The Lambda also logs the number of tokens it detected as available within the bucket.
I am using Cloud Watch events to invoke my Lambda. As Cloud Watch events can be scheduled at maximum frequency of once per minute, I decided to run a 1 minute loop within my function. Effectively achieving a 1 second update for my Token Bucket
On running this code kept failing with a timeout exception.
Logging Cloud Watch metric 100
Unable to execute HTTP request: Connect to monitoring.us-east-1.amazonaws.com:443
[monitoring.us-east-1.amazonaws.com/52.94.238.171] failed: connect timed out:
com.amazonaws.SdkClientException
com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to
monitoring.us-east-1.amazonaws.com:443 [monitoring.us-east-1.amazonaws.com/
52.94.238.171] failed: connect timed out
A little read-up revealed that my Lambda was unable to contact to AWS Cloud Watch endpoints.
By default, Lambda runs your functions in a secure VPC with access to AWS
services and the internet. The VPC is owned by Lambda and does not connect to
your account's default VPC. When you connect a function to a VPC in your account,
it does not have access to the internet unless your VPC provides access.
The Elasti Cache instance exists within a VPC. In order for Lambda function to be able to access the Cache, we provided Lambda with VPC, subnets and security group details. However that means Lambda function can no longer connect to Cloud Watch API. To resolve this we need to use VPC Endpoints.
A VPC endpoint enables you to privately connect your VPC to supported AWS
services and VPC endpoint services powered by AWS PrivateLink without requiring
an internet gateway, NAT device, VPN connection, or AWS Direct Connect
connection.Instances in your VPC do not require public IP addresses to
communicate with resources in the service. Traffic between your VPC and the other
service does not leave the Amazon network.
CloudWatch falls in this category.
If you use Amazon Virtual Private Cloud (Amazon VPC) to host your AWS resources,
you can establish a private connection between your VPC, CloudWatch, and
CloudWatch Synthetics. You can use these connections to enable CloudWatch and
CloudWatch Synthetics to communicate with your resources on your VPC without
going through the public internet.
I created Interface VPC Endpoint to talk with AWS Cloudwatch as suggested here.
The subnets and security group are same as my VPC. The policy is edited to only support CloudWatch.
Rerunning the Lambda worked.
In Handler: Executing TokenBucketRefresher, $LATEST Input received 2020-05-21T16:40:54Z Total Runs 60 and available capacities detected [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 100, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] END RequestId: fecd5efb-5d00-4b84-ba40-da144007cc9a REPORT RequestId: fecd5efb-5d00-4b84-ba40-da144007cc9a Duration: 60165.64 ms
Billed Duration: 60200 ms Memory Size: 512 MB Max Memory Used: 143 MB
Init Duration: 1993.44 ms
Similarly I created a Lambda function that consumes tokens from the bucket
public class TokenBucketUserLambdaFn implements RequestHandler<String, Void> { private final Jedis jedis; private final AmazonCloudWatchAsync cloudWatchClient; private Map<String, Integer> NEEDED_CAPACITY_MAPPING = new HashMap<>(); { NEEDED_CAPACITY_MAPPING.put("BLUE", 60); NEEDED_CAPACITY_MAPPING.put("RED", 90); NEEDED_CAPACITY_MAPPING.put("YELLOW", 10); } public TokenBucketUserLambdaFn() { jedis = new Jedis(new HostAndPort("redis-test-cache-001.vmaptv.0001.use1.cache.amazonaws.com", 6379)); AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey", "SecretKey"); AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); cloudWatchClient = AmazonCloudWatchAsyncClientBuilder.standard()
.withCredentials(awsCredentialsProvider) .withClientConfiguration(new ClientConfiguration()).build(); } @Override public Void handleRequest(String input, Context context) { LambdaLogger logger = context.getLogger(); logger.log("In Handler: Executing " + context.getFunctionName() + ", " + context.getFunctionVersion()); logger.log("Input received " + input); if (!NEEDED_CAPACITY_MAPPING.containsKey(input)) { throw new IllegalArgumentException("Input is invalid - " + input); } int totalRuns = 0; long startTime = System.currentTimeMillis(); List<Integer> availCapacities = new ArrayList<>(60); while (totalRuns < 60 && (System.currentTimeMillis() - startTime < 1000 * 60)) { long timestamp = System.currentTimeMillis(); int availCapacity = getTokens(logger, NEEDED_CAPACITY_MAPPING.get(input)); availCapacities.add(availCapacity); asyncLogMetricsToCloudWatch(availCapacity, input); long duration = System.currentTimeMillis() - timestamp; if (duration < 1000) { //Operation took less than a second, let Thread sleep before running again try { Thread.sleep(1000 - duration); } catch (InterruptedException e) { logger.log("Sleep failed " + e.getMessage()); } } totalRuns++; } logger.log("Total Runs " + totalRuns + " for " + input + " and available capacities detected " + availCapacities); return null; } private int getTokens(LambdaLogger logger, int requestedCapacity) { String token = "TokenCount"; String value = jedis.get(token);//Get available tokens if (value == null) { logger.log("No value detected for Key"); return 0; } int availCapacity = Integer.valueOf(value); int tokensUsed = availCapacity < requestedCapacity ? availCapacity : requestedCapacity; jedis.set(token, String.valueOf(availCapacity - tokensUsed)); return tokensUsed;//used capacity } private void asyncLogMetricsToCloudWatch(int unusedCapacity, String input) { Collection<MetricDatum> metricDatums = new ArrayList<>(1); metricDatums.add(new MetricDatum() .withMetricName(input + "CapacityUsed") .withValue(Double.valueOf(unusedCapacity)) .withUnit(StandardUnit.None)); PutMetricDataRequest putMetricDataRequest = new PutMetricDataRequest(); putMetricDataRequest.setMetricData(metricDatums); putMetricDataRequest.setNamespace("TokenBucket"); //Asynchronous push - returns a future with result cloudWatchClient.putMetricDataAsync(putMetricDataRequest); } }
The Lambda again executes in a loop 60 times. To run the Lambda for three different inputs I setup another Cloud Watch Rule
START RequestId: 960f7fbe-cd6d-4301-97d5-057913abea22 Version: $LATEST In Handler: Executing TokenBucketUser, $LATEST Input received RED Total Runs 60 for RED and available capacities detected [50, 50, 50, 50, 50, 50,
50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50,
50, 50, 50, 50, 50, 50, 90, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50,
50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50] END RequestId: 960f7fbe-cd6d-4301-97d5-057913abea22 REPORT RequestId: 960f7fbe-cd6d-4301-97d5-057913abea22 Duration: 60009.49 ms Billed Duration: 60100 ms Memory Size: 512 MB Max Memory Used: 170 MB START RequestId: b801600b-724d-4909-999a-029350e49388 Version: $LATEST
As seen here, all though the consumers would like to execute at higher rates. But they can only work with TPS available.
No comments:
Post a Comment