Search This Blog

Tuesday 14 April 2020

Serverless - Lambda to publish to a stream

In the last post we started with Hello World Lambda - one that was driven by S3 events to write to S3. I wanted to attempt to use Lambdas for transformations. i.e. convert my file data in S3 to a stream.
My use case is that whenever a file is available in my S3 bucket, a lambda should be triggered that writes the data to a Kinesis stream. Kinesis is a very powerful streaming solution, that allows you to operate on fast and large data streams.
I probably wouldn't use a Lambda to write to Kinesis in a production environment - Biggest reason being Lambda runtime being 15 mins, processing large S3 files on Lambda could exceed that time. 
I was more interested in testing Kinesis behavior on KPL scaling in a quick way and Lambda was easy to setup and test. Lambda execution also means, I do not have to worry about network latencies when generating the data from my machine.
The Lambda function is as below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class KinesisDataProducer implements RequestHandler<S3Event, Void> {

    private static final int METRIC_BATCH_SIZE = 25;
    private static final int KINESIS_BATCH_SIZE = 2000;
    private static final String KINESIS_STREAM = "KinesisDataProducer";

    private KinesisDataWriter kinesisDataWriter;
    private AmazonCloudWatch cloudWatchClient;

    public KinesisDataProducer() {
        AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey",
                "SecretKey");
        AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);

        KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
        configuration.setCredentialsProvider(awsCredentialsProvider);
        configuration.setRegion("us-east-1");
        configuration.setRecordMaxBufferedTime(500);//500 milliseconds

        kinesisDataWriter = new KinesisDataWriter(new KinesisProducer(configuration), KINESIS_STREAM);
        cloudWatchClient = AmazonCloudWatchClientBuilder.standard().withCredentials(awsCredentialsProvider)
                .withClientConfiguration(new ClientConfiguration()).build();
    }

    @Override
    public Void handleRequest(S3Event s3event, Context context) {
        LambdaLogger logger = context.getLogger();
        logger.log("In Handler: Executing " + context.getFunctionName());
        logger.log(s3event.toJson());
        try {
            //Get the details of the notification
            S3EventNotification.S3EventNotificationRecord record = s3event.getRecords().get(0);
            String srcBucket = record.getS3().getBucket().getName();
            String srcKey = record.getS3().getObject().getUrlDecodedKey();

            logger.log("Processing file " + srcKey);
            streamFileIntoKinesis(srcBucket, srcKey, logger);
            logger.log("File " + srcKey + " processing complete ");
        } catch (IOException | AmazonServiceException | ExecutionException | InterruptedException e) {
            logger.log("Error " + e.getMessage());
            throw new RuntimeException(e);
        } finally {
            logger.log("Completed Handler: Executing " + context.getFunctionName());
        }
        return null;
    }
}
I added a KinesisDataWriter class to hold the Kinesis interaction logic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class KinesisDataWriter {

    private KinesisProducer kinesisProducer;
    private String kinesisStream;

    public KinesisDataWriter(final KinesisProducer kinesisProducer, final String kinesisStream) {
        this.kinesisProducer = kinesisProducer;
        this.kinesisStream = kinesisStream;
    }

    public List<String> insertRecords(final List<String> inputRecords) throws ExecutionException,
            InterruptedException, UnsupportedEncodingException {
        // Put some records and save the Futures
        List<Future<UserRecordResult>> putFutures = new LinkedList<>();
        for (String inputRecord : inputRecords) {
            ByteBuffer data = ByteBuffer.wrap(inputRecord.getBytes("UTF-8"));
            // doesn't block
            putFutures.add(kinesisProducer.addUserRecord(kinesisStream, inputRecord.toLowerCase(), data));
        }

        List<String> failedRecs = new ArrayList<>();
        // Wait for puts to finish and check the results
        for (int i = 0; i < putFutures.size(); i++) {
            //Check the status of the ith record
            UserRecordResult result = putFutures.get(i).get();// this blocks
            if (result.isSuccessful()) {
                //Successful, no worries
            } else {
                String failedMessage = inputRecords.get(i);
                failedRecs.add(failedMessage);
                for (Attempt attempt : result.getAttempts()) {
                    // Analyze and respond to the failure
                    System.out.println("Record failed : " + failedMessage + " with error " + attempt.getErrorMessage());
                }
            }
        }
        return failedRecs;
    }

}
The class pushes data and then waits for Kinesis Stream to guarantee delivery.
I used KPL here instead of the basic Kinesis API.  To interact with Kinesis, I setup a AWSCredentialsProvider instance - this is the way the various AWS Clients can interact with AWS services. All this information is passed to KinesisProducer instance.
The key benefit with KPL is buffered writing or Collection - records get collected at the client and then send as a batch operation to Kinesis. KPL also has ability to concatenate multiple user records into one Kinesis Record (Aggregation). The two functionalities allow KPL to perform the Kinesis writing more efficiently.
Other benefits which make more sense for long running applications are that KPL process is separate from the application process (Link).
If the Lambda was processing one record per invocation, then KPL would be redundant and we could directly use the PutRecord API instead.
I also created AmazonCloudWatch instance - to log custom metrics about the publishing.

With that discussion out of the way, lets look at the actual code inside the Lambda:
The code reads a batch of records from S3 file, waits on successful publish to Kinesis Stream before trying to write next batch. This is blocking technique being used to ensure no records are lost. We can also write to Kinesis in a fire and forget style.
I setup a Kinesis Stream with 1 shard. Also I added Kinesis permissions to my Lambda execution role.
I looked at the data being written to Kinesis:
As seen the number of UserRecords is far more than the number of Kinesis Records. This is because KPL provides aggregation, writing several of my records as one Kinesis Record
Also as the file size increased my lambda timed out

1
2
2020-04-13T22:03:36.210Z ac9afab8-1115-4d6e-a491-ded9d756055f Task timed out after 900.10 seconds
REPORT RequestId: ac9afab8-1115-4d6e-a491-ded9d756055f Duration: 900096.51 ms Billed Duration: 900000 ms
 Memory Size: 512 MB Max Memory Used: 218 MB 

I changed the aggregation pattern to try and get throttle errors: Added this to constructor
1
       configuration.setAggregationEnabled(false);
Now Kinesis should treat each of my record as one record. As my Stream has only 1 shard, the max capacity for my Stream is 1000 record writes/sec

Notice how the graph has changed. The number of user records put fell, as I wasn't using aggregation anymore. Also the KinesisRecords being added went up during this period (Collection still in use, hence the numbers not identical).
As my Lambda has no inbuilt throttling, it attempts to push as many records as possible. However to prevent record loss, it processes the response waiting for a successful ack from Kinesis before attempting the next big push.
I also started observing throttle errors in Kinesis:

Interestingly my custom Metrics which indicated if the API failed to push any record to cloud watch is always Zero. KPL managed through retries to ensure that all my records made it.
Also my Lambda was able to download 1Gb files from S3 and run. Per Lambda page the /tmp directory storage is limited to 512 MB (which is where I assumed this file is being downloaded) but that is not the case.

1 comment: