Search This Blog

Thursday, 21 May 2020

Playing with ElastiCache - 2

In the previous post we setup a basic Lambda function that wrote to Redis. I updated the function to log some Cloud Watch metrics.

public class TokenBucketRefresherLambdaFn implements RequestHandler<String, Void> {

    private final Jedis jedis;
    private final AmazonCloudWatchAsync cloudWatchClient;

    public TokenBucketRefresherLambdaFn() {
        jedis = new Jedis(new HostAndPort(
             "redis-test-cache-001.vmaptv.0001.use1.cache.amazonaws.com", 6379));
        AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey",
                "SecretKey");
        AWSCredentialsProvider awsCredentialsProvider = new 
                AWSStaticCredentialsProvider(awsCredentials);

        cloudWatchClient = AmazonCloudWatchAsyncClientBuilder.standard()
                .withCredentials(awsCredentialsProvider)
                .withClientConfiguration(new ClientConfiguration()).build();

    }

    @Override
    public Void handleRequest(String input, Context context) {
        LambdaLogger logger = context.getLogger();
        logger.log("In Handler: Executing " + context.getFunctionName() + ", " 
               + context.getFunctionVersion());
        logger.log("Input received " + input); //This input is not really used now
        int totalRuns = 0;
        long startTime = System.currentTimeMillis();
        List<Integer> availCapacities = new ArrayList<>(60);
        while (totalRuns < 60 && (System.currentTimeMillis() - startTime < 1000 * 60)) {
            long timestamp = System.currentTimeMillis();
            int availCapacity = updateTokenCapacity(logger);
            availCapacities.add(availCapacity);
            asyncLogMetricsToCloudWatch(availCapacity);
            long duration = System.currentTimeMillis() - timestamp;
            if (duration < 1000) {
                //Operation took less than a second, let Thread sleep before running again
                try {
                    Thread.sleep(1000 - duration);
                } catch (InterruptedException e) {
                    logger.log("Sleep failed " + e.getMessage());
                }
            }
            totalRuns++;

        }
        logger.log("Total Runs " + totalRuns + " and available capacities detected " + availCapacities);
        return null;
    }

    private int updateTokenCapacity(LambdaLogger logger) {
        String token = "TokenCount";
        int capacity = 60; //adds 60 tokens at every run. Bucket capacity is 100
        String value = jedis.get(token);
        int crtBucketValue = 0;
        if (value == null) {
            logger.log("No value detected for Key, adding");
        } else {
            crtBucketValue = Integer.parseInt(value);
        }
        int newBucketCapacity = Math.min(100, crtBucketValue + 60);
        jedis.set(token, String.valueOf(newBucketCapacity));
        return value == null ? 0 : Integer.valueOf(value); 
         //capacity available for use
    }

    private void asyncLogMetricsToCloudWatch(int unusedCapacity) {
        Collection<MetricDatum> metricDatums = new ArrayList<>(1);
        metricDatums.add(new MetricDatum()
                .withMetricName("capacityAvailable")
                .withValue(Double.valueOf(unusedCapacity))
                .withUnit(StandardUnit.None));
        PutMetricDataRequest putMetricDataRequest = new PutMetricDataRequest();
        putMetricDataRequest.setMetricData(metricDatums);
        putMetricDataRequest.setNamespace("TokenBucket");
        //Asynchronous push - returns a future with result
        cloudWatchClient.putMetricDataAsync(putMetricDataRequest);
    }

}
This code is trying to do a very simplified implementation of the Token Bucket Algorithm.
The Lambda runs every minute. Within the code, it attempts to update the bucket every second by adding 60 tokens in every cache update. The bucket size is capped at 100 tokens. The Lambda also logs the number of tokens it detected as available within the bucket.
I am using Cloud Watch events to invoke my Lambda. As Cloud Watch events can be scheduled at maximum frequency of once per minute, I decided to run a 1 minute loop within my function. Effectively achieving a 1 second update for my Token Bucket

On running this code kept failing with a timeout exception.
Logging Cloud Watch metric 100
Unable to execute HTTP request: Connect to monitoring.us-east-1.amazonaws.com:443
 [monitoring.us-east-1.amazonaws.com/52.94.238.171] failed: connect timed out: 
com.amazonaws.SdkClientException
com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
monitoring.us-east-1.amazonaws.com:443 [monitoring.us-east-1.amazonaws.com/
52.94.238.171] failed: connect timed out

A little read-up revealed that my Lambda was unable to contact to AWS Cloud Watch endpoints.
By default, Lambda runs your functions in a secure VPC with access to AWS 
services and the internet. The VPC is owned by Lambda and does not connect to 
your account's default VPC. When you connect a function to a VPC in your account,
it does not have access to the internet unless your VPC provides access.

The Elasti Cache instance exists within a VPC. In order for Lambda function to be able to access the Cache, we provided Lambda with VPC, subnets and security group details. However that means Lambda function can no longer connect to Cloud Watch API. To resolve this we need to use VPC Endpoints.
A VPC endpoint enables you to privately connect your VPC to supported AWS 
services and VPC endpoint services powered by AWS PrivateLink without requiring 
an internet gateway, NAT device, VPN connection, or AWS Direct Connect 
connection.Instances in your VPC do not require public IP addresses to 
communicate with resources in the service. Traffic between your VPC and the other
service does not leave the Amazon network.

CloudWatch falls in this category.
If you use Amazon Virtual Private Cloud (Amazon VPC) to host your AWS resources,
 you can establish a private connection between your VPC, CloudWatch, and 
CloudWatch Synthetics. You can use these connections to enable CloudWatch and 
CloudWatch Synthetics to communicate with your resources on your VPC without 
going through the public internet.

I created  Interface VPC Endpoint to talk with AWS Cloudwatch as suggested here.

The subnets and security group are same as my VPC. The policy is edited to only support CloudWatch.

Rerunning the Lambda worked.

In Handler: Executing TokenBucketRefresher, $LATEST
Input received 2020-05-21T16:40:54Z
Total Runs 60 and available capacities detected [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 100, 0, 0, 0, 0, 0, 0, 0, 0,
 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
END RequestId: fecd5efb-5d00-4b84-ba40-da144007cc9a
REPORT RequestId: fecd5efb-5d00-4b84-ba40-da144007cc9a Duration: 60165.64 ms 
Billed Duration: 60200 ms Memory Size: 512 MB Max Memory Used: 143 MB 
Init Duration: 1993.44 ms 
I set up a cloud watch event to invoke my Lambda every minute.
Similarly I created a Lambda function that consumes tokens from the bucket

public class TokenBucketUserLambdaFn implements RequestHandler<String, Void> {

    private final Jedis jedis;
    private final AmazonCloudWatchAsync cloudWatchClient;

    private Map<String, Integer> NEEDED_CAPACITY_MAPPING = new HashMap<>();

    {
        NEEDED_CAPACITY_MAPPING.put("BLUE", 60);
        NEEDED_CAPACITY_MAPPING.put("RED", 90);
        NEEDED_CAPACITY_MAPPING.put("YELLOW", 10);
    }

    public TokenBucketUserLambdaFn() {
        jedis = new Jedis(new HostAndPort("redis-test-cache-001.vmaptv.0001.use1.cache.amazonaws.com",
                6379));
        AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey",
                "SecretKey");
        AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);

        cloudWatchClient = AmazonCloudWatchAsyncClientBuilder.standard()
                .withCredentials(awsCredentialsProvider)
                .withClientConfiguration(new ClientConfiguration()).build();

    }

    @Override
    public Void handleRequest(String input, Context context) {
        LambdaLogger logger = context.getLogger();
        logger.log("In Handler: Executing " + context.getFunctionName() + ", " + context.getFunctionVersion());
        logger.log("Input received " + input);
        if (!NEEDED_CAPACITY_MAPPING.containsKey(input)) {
            throw new IllegalArgumentException("Input is invalid - " + input);
        }
        int totalRuns = 0;
        long startTime = System.currentTimeMillis();
        List<Integer> availCapacities = new ArrayList<>(60);
        while (totalRuns < 60 && (System.currentTimeMillis() - startTime < 1000 * 60)) {
            long timestamp = System.currentTimeMillis();
            int availCapacity = getTokens(logger, NEEDED_CAPACITY_MAPPING.get(input));
            availCapacities.add(availCapacity);
            asyncLogMetricsToCloudWatch(availCapacity, input);
            long duration = System.currentTimeMillis() - timestamp;
            if (duration < 1000) {
                //Operation took less than a second, let Thread sleep before running again
                try {
                    Thread.sleep(1000 - duration);
                } catch (InterruptedException e) {
                    logger.log("Sleep failed " + e.getMessage());
                }
            }
            totalRuns++;

        }
        logger.log("Total Runs " + totalRuns + " for " + input + " and available capacities detected " + availCapacities);
        return null;
    }

    private int getTokens(LambdaLogger logger, int requestedCapacity) {
        String token = "TokenCount";
        String value = jedis.get(token);//Get available tokens
        if (value == null) {
            logger.log("No value detected for Key");
            return 0;
        }
        int availCapacity = Integer.valueOf(value);
        int tokensUsed = availCapacity < requestedCapacity ? availCapacity : requestedCapacity;
        jedis.set(token, String.valueOf(availCapacity - tokensUsed));
        return tokensUsed;//used capacity
    }

    private void asyncLogMetricsToCloudWatch(int unusedCapacity, String input) {
        Collection<MetricDatum> metricDatums = new ArrayList<>(1);
        metricDatums.add(new MetricDatum()
                .withMetricName(input + "CapacityUsed")
                .withValue(Double.valueOf(unusedCapacity))
                .withUnit(StandardUnit.None));
        PutMetricDataRequest putMetricDataRequest = new PutMetricDataRequest();
        putMetricDataRequest.setMetricData(metricDatums);
        putMetricDataRequest.setNamespace("TokenBucket");
        //Asynchronous push - returns a future with result
        cloudWatchClient.putMetricDataAsync(putMetricDataRequest);
    }

}
The Lambda can receive three different inputs (BLUE/RED/YELLOW) based on which it tries to acquire certain tokens every second.
The Lambda again executes in a loop 60 times. To run the Lambda for three different inputs I setup another Cloud Watch Rule

START RequestId: 960f7fbe-cd6d-4301-97d5-057913abea22 Version: $LATEST
In Handler: Executing TokenBucketUser, $LATEST
Input received RED
Total Runs 60 for RED and available capacities detected [50, 50, 50, 50, 50, 50,
 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50,
 50, 50, 50, 50, 50, 50, 90, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50,
 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50]
END RequestId: 960f7fbe-cd6d-4301-97d5-057913abea22
REPORT RequestId: 960f7fbe-cd6d-4301-97d5-057913abea22 Duration: 60009.49 ms Billed Duration: 60100 ms Memory Size: 512 MB Max Memory Used: 170 MB 
START RequestId: b801600b-724d-4909-999a-029350e49388 Version: $LATEST
Now to watch metrics as the functions execute every minute.
As seen here, all though the consumers would like to execute at higher rates. But they can only work with TPS available.

No comments:

Post a Comment