Search This Blog

Sunday, 31 May 2020

DAX - speeding/cheapening up Dynamo Db

Amazon DynamoDb comes with its own cache layer DAX or DynamoDB Accelerator. In this post I am going to play around with this feature.
DAX as the AWS docs say provide 3 benefits:

  1. It's a cache - so the response from caches will be faster than hitting the actual table.
  2. It's a cache - so the number of calls to database are reduced, allowing us to save up on costs.
  3. It's provided by DynamoDb - The api is like a wrapper that can be dropped in code. So minimal changes and easier than setting up our own cache layer.
DAX like ElastiCache is setup within a VPC. The cache is essentially a wrapper:
At runtime, the DAX client directs all of your application's DynamoDB API requests
to the DAX cluster. If DAX can process one of these API requests directly, it 
does so. Otherwise, it passes the request through to DynamoDB. Finally, the 
DAX cluster returns the results to your application.

To test DAX, I need to setup a Dynamo table first:

Below code creates 100K records in Users table:
  public static void main(String[] args) {
        AWSCredentials awsCredentials = new BasicAWSCredentials(
                "AccessKey", "SecretKey");
        AWSCredentialsProvider awsCredentialsProvider = new 
            AWSStaticCredentialsProvider(awsCredentials);

        AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard()
                .withRegion("us-east-1")
                .withCredentials(awsCredentialsProvider).build();
        DynamoDB dynamoDB = new DynamoDB(client);
        List<Item> userItems = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            Item item = new Item().withPrimaryKey("userId", Integer.valueOf(i))
                    .withString("name", "User_" + i)
                    .withNumber("age", (int) (Math.random() * 60));
            userItems.add(item);
            if (userItems.size() == 25) {//max entries allowed is 25
                TableWriteItems userTableWriteItems = new TableWriteItems("Users")
                        .withItemsToPut(userItems);
                System.out.println("Making the request... " + i);
                batchWriteItems(dynamoDB, userTableWriteItems);
                userItems.clear();
            }
        }
    }

    private static void batchWriteItems(DynamoDB dynamoDB, 
          TableWriteItems userTableWriteItems) {
        BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(userTableWriteItems);
        // Check for unprocessed keys which could happen if you exceed
        // provisioned throughput
        int retryAttempt = 0;
        do {
            Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();
            if (outcome.getUnprocessedItems().size() == 0) {
                //all is well, move on
            } else {
                System.out.println("Retrieving " + outcome.getUnprocessedItems().size() + " unprocessed items"
                        + " Attempt No " + retryAttempt);
                outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems);
            }
        } while (outcome.getUnprocessedItems().size() > 0);
    }

The code does not use DAX, it directly uses a DynamoDb client to write to the Users table. The next step would be to setup a DAX cluster for use with our table.

Step 1: Allow DAX to interact with DynamoDb
As we read earlier, DAX intercepts all DynamoDb calls, and executing the necessary ones on Dynamo. To do this execution, DAX needs the requisite permissions.
Before you can create an Amazon DynamoDB Accelerator (DAX) cluster, you must 
create a service role for it. A service role is an AWS Identity and Access 
Management (IAM) role that authorizes an AWS service to act on your behalf.
The service role allows DAX to access your DynamoDB tables as if you were 
accessing those tables yourself.
Step 2: Configure subnet to allow DAX (which exists within VPC) to connect with Dynamo Db.



The next step was to test the DAX cluster.

To perform my tests, I decided to setup a simple Lambda that uses a DAX client to access the table.

public class DAXTester implements RequestHandler<Void, Void> {

    private DynamoDB daxClient;
    private DynamoDB dbClient;

    public DAXTester() {
        AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey",
                "SecretKey");
        AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);

        String daxEndpointUrl = "users-dax-cluster.xr5tql.clustercfg.dax.use1.cache.amazonaws.com:8111";
        AmazonDaxClientBuilder daxClientBuilder = AmazonDaxClientBuilder.standard();
        daxClientBuilder.withRegion("us-east-1")
                .withCredentials(awsCredentialsProvider)
                .withEndpointConfiguration(daxEndpointUrl);
        daxClient = new DynamoDB(daxClientBuilder.build());

        dbClient = new DynamoDB(AmazonDynamoDBClientBuilder.standard()
                .withRegion("us-east-1")
                .withCredentials(awsCredentialsProvider).build());
    }

    @Override
    public Void handleRequest(Void input, Context context) {

        LambdaLogger lambdaLogger = context.getLogger();
        String usersTable = "Users";
        Table dbTable = dbClient.getTable(usersTable);

        //Test 1 - Get without DAX
        //Lambda needs
        lambdaLogger.log("Tests for GetItem");
        lambdaLogger.log("Dynamo Tests");
        computeGetItemDuration(lambdaLogger, dbTable);
        //Test 2 - Get with DAX - no entry in cache
        Table daxTable = daxClient.getTable(usersTable);
        lambdaLogger.log("DAX Tests - not in Cache");
        computeGetItemDuration(lambdaLogger, daxTable);
        //Test 3 - Get with DAX - entry in cache
        lambdaLogger.log("DAX Tests - present in Cache");
        computeGetItemDuration(lambdaLogger, daxTable);


        return null;
    }

    private void computeGetItemDuration(LambdaLogger lambdaLogger, Table table) {
        double duration = 0;
        int noOfTests = 500;
        for (int i = 0; i < noOfTests; i++) {
            long startTime = System.nanoTime();
            GetItemOutcome outcome = table.getItemOutcome("userId", Integer.valueOf(i));
            outcome.getItem().get("name");
            duration += (System.nanoTime() - startTime);
        }
        DecimalFormat df = new DecimalFormat("#.##");
        String formattedNumber = df.format((duration / noOfTests));
        lambdaLogger.log("Average FetchTime = " + formattedNumber  + " nano seconds");
    }

}
The code is pretty straight forward. It creates a DAX client and a DynamoDb client. Code measures the latency for a GetItem call across three scenarios:
  1. No Cache - directly fetch from Dynamo
  2. Cache Miss - use DAX, but entry not found resulting in DAX having to call Dynamo
  3. Cache Hit - entry found in DAX, so no Dynamo hit
I had some problems building the DAX Maven dependency for the project (dependency issues related to AWS and Dynamo packages). So I downloaded the amazon-dax-client jar and added that as a direct dependency. (I used the addjars-maven-plugin for this.)

Once Lambda was setup, I ran into errors in executing my function:
May 30, 2020 8:07:04 PM com.amazon.dax.client.SocketTubePool setLastError
WARNING: error connecting to /172.31.82.151:8111 java.net.SocketTimeoutException:
 connect timed out java.net.SocketTimeoutException: connect timed out

I faced the same problem that I faced with Elasticache - my lambda is unable to connect to DAX.
As DAX is setup in VPC, Lambda which runs outside this VPC cannot access it. To fix this we need to make Lambda VPC-aware. (Similar to what we did here)

The VPC info in Lambda allows it to communicate with DAX. But now we have another problem. DynamoDb is outside this VPC. Our Lambda calls cannot be made to Dynamo Db anymore.
We faced a similar problem where Lambda within VPC could not communicate with CloudWatch (Read here). To fix this, we setup an Interface Endpoint that routed Lambda calls to CloudWatch.
With Dynamo Db (and S3) we need to use what is known as a Gateway Endpoint.

A gateway endpoint is a gateway that you specify as a target for a route in your route table for traffic destined to a supported AWS service. The following AWS services are supported:
* Amazon S3
* DynamoDB
Accordingly I setup an endpoint that will route Dynamo Db calls coming out of my VPC to Dynamo DB Gateway.
Now the code is ready for execution.

START RequestId: 86a6c562-72a4-46a4-9c87-8899c65f1fc9 Version: $LATEST
Tests for GetItem
Dynamo Tests
Average FetchTime = 35719684.39 nano seconds
DAX Tests - not in Cache
Average FetchTime = 7340349.86 nano seconds
DAX Tests - present in Cache
Average FetchTime = 1400325.51 nano seconds
END RequestId: 86a6c562-72a4-46a4-9c87-8899c65f1fc9
REPORT RequestId: 86a6c562-72a4-46a4-9c87-8899c65f1fc9 Duration: 22365.35 ms Billed Duration: 22400 ms Memory Size: 256 MB Max Memory Used: 125 MB Init Duration: 1830.58 ms 
This is weird. Dynamo calls were slower when directly accessed than when accessed by DAX through Dynamo. (The VPC outgoing calls appear to add time)

The DAX Cache lookup however is faster that Dynamo Read.


No comments:

Post a Comment