In the last post we started with Hello World Lambda - one that was driven by S3 events to write to S3. I wanted to attempt to use Lambdas for transformations. i.e. convert my file data in S3 to a stream.
My use case is that whenever a file is available in my S3 bucket, a lambda should be triggered that writes the data to a Kinesis stream. Kinesis is a very powerful streaming solution, that allows you to operate on fast and large data streams.
I probably wouldn't use a Lambda to write to Kinesis in a production environment - Biggest reason being Lambda runtime being 15 mins, processing large S3 files on Lambda could exceed that time.
I was more interested in testing Kinesis behavior on KPL scaling in a quick way and Lambda was easy to setup and test. Lambda execution also means, I do not have to worry about network latencies when generating the data from my machine.
The Lambda function is as below:
I added a KinesisDataWriter class to hold the Kinesis interaction logic:
The class pushes data and then waits for Kinesis Stream to guarantee delivery.
I used KPL here instead of the basic Kinesis API. To interact with Kinesis, I setup a AWSCredentialsProvider instance - this is the way the various AWS Clients can interact with AWS services. All this information is passed to KinesisProducer instance.
The key benefit with KPL is buffered writing or Collection - records get collected at the client and then send as a batch operation to Kinesis. KPL also has ability to concatenate multiple user records into one Kinesis Record (Aggregation). The two functionalities allow KPL to perform the Kinesis writing more efficiently.
Other benefits which make more sense for long running applications are that KPL process is separate from the application process (Link).
If the Lambda was processing one record per invocation, then KPL would be redundant and we could directly use the PutRecord API instead.
I also created AmazonCloudWatch instance - to log custom metrics about the publishing.
With that discussion out of the way, lets look at the actual code inside the Lambda:
The code reads a batch of records from S3 file, waits on successful publish to Kinesis Stream before trying to write next batch. This is blocking technique being used to ensure no records are lost. We can also write to Kinesis in a fire and forget style.
I setup a Kinesis Stream with 1 shard. Also I added Kinesis permissions to my Lambda execution role.
I looked at the data being written to Kinesis:
As seen the number of UserRecords is far more than the number of Kinesis Records. This is because KPL provides aggregation, writing several of my records as one Kinesis Record
Also as the file size increased my lambda timed out
I changed the aggregation pattern to try and get throttle errors: Added this to constructor
Now Kinesis should treat each of my record as one record. As my Stream has only 1 shard, the max capacity for my Stream is 1000 record writes/sec
Notice how the graph has changed. The number of user records put fell, as I wasn't using aggregation anymore. Also the KinesisRecords being added went up during this period (Collection still in use, hence the numbers not identical).
As my Lambda has no inbuilt throttling, it attempts to push as many records as possible. However to prevent record loss, it processes the response waiting for a successful ack from Kinesis before attempting the next big push.
I also started observing throttle errors in Kinesis:
Interestingly my custom Metrics which indicated if the API failed to push any record to cloud watch is always Zero. KPL managed through retries to ensure that all my records made it.
Also my Lambda was able to download 1Gb files from S3 and run. Per Lambda page the /tmp directory storage is limited to 512 MB (which is where I assumed this file is being downloaded) but that is not the case.
My use case is that whenever a file is available in my S3 bucket, a lambda should be triggered that writes the data to a Kinesis stream. Kinesis is a very powerful streaming solution, that allows you to operate on fast and large data streams.
I probably wouldn't use a Lambda to write to Kinesis in a production environment - Biggest reason being Lambda runtime being 15 mins, processing large S3 files on Lambda could exceed that time.
I was more interested in testing Kinesis behavior on KPL scaling in a quick way and Lambda was easy to setup and test. Lambda execution also means, I do not have to worry about network latencies when generating the data from my machine.
The Lambda function is as below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | public class KinesisDataProducer implements RequestHandler<S3Event, Void> { private static final int METRIC_BATCH_SIZE = 25; private static final int KINESIS_BATCH_SIZE = 2000; private static final String KINESIS_STREAM = "KinesisDataProducer"; private KinesisDataWriter kinesisDataWriter; private AmazonCloudWatch cloudWatchClient; public KinesisDataProducer() { AWSCredentials awsCredentials = new BasicAWSCredentials("AccessKey", "SecretKey"); AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); KinesisProducerConfiguration configuration = new KinesisProducerConfiguration(); configuration.setCredentialsProvider(awsCredentialsProvider); configuration.setRegion("us-east-1"); configuration.setRecordMaxBufferedTime(500);//500 milliseconds kinesisDataWriter = new KinesisDataWriter(new KinesisProducer(configuration), KINESIS_STREAM); cloudWatchClient = AmazonCloudWatchClientBuilder.standard().withCredentials(awsCredentialsProvider) .withClientConfiguration(new ClientConfiguration()).build(); } @Override public Void handleRequest(S3Event s3event, Context context) { LambdaLogger logger = context.getLogger(); logger.log("In Handler: Executing " + context.getFunctionName()); logger.log(s3event.toJson()); try { //Get the details of the notification S3EventNotification.S3EventNotificationRecord record = s3event.getRecords().get(0); String srcBucket = record.getS3().getBucket().getName(); String srcKey = record.getS3().getObject().getUrlDecodedKey(); logger.log("Processing file " + srcKey); streamFileIntoKinesis(srcBucket, srcKey, logger); logger.log("File " + srcKey + " processing complete "); } catch (IOException | AmazonServiceException | ExecutionException | InterruptedException e) { logger.log("Error " + e.getMessage()); throw new RuntimeException(e); } finally { logger.log("Completed Handler: Executing " + context.getFunctionName()); } return null; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public class KinesisDataWriter { private KinesisProducer kinesisProducer; private String kinesisStream; public KinesisDataWriter(final KinesisProducer kinesisProducer, final String kinesisStream) { this.kinesisProducer = kinesisProducer; this.kinesisStream = kinesisStream; } public List<String> insertRecords(final List<String> inputRecords) throws ExecutionException, InterruptedException, UnsupportedEncodingException { // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<>(); for (String inputRecord : inputRecords) { ByteBuffer data = ByteBuffer.wrap(inputRecord.getBytes("UTF-8")); // doesn't block putFutures.add(kinesisProducer.addUserRecord(kinesisStream, inputRecord.toLowerCase(), data)); } List<String> failedRecs = new ArrayList<>(); // Wait for puts to finish and check the results for (int i = 0; i < putFutures.size(); i++) { //Check the status of the ith record UserRecordResult result = putFutures.get(i).get();// this blocks if (result.isSuccessful()) { //Successful, no worries } else { String failedMessage = inputRecords.get(i); failedRecs.add(failedMessage); for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure System.out.println("Record failed : " + failedMessage + " with error " + attempt.getErrorMessage()); } } } return failedRecs; } } |
I used KPL here instead of the basic Kinesis API. To interact with Kinesis, I setup a AWSCredentialsProvider instance - this is the way the various AWS Clients can interact with AWS services. All this information is passed to KinesisProducer instance.
The key benefit with KPL is buffered writing or Collection - records get collected at the client and then send as a batch operation to Kinesis. KPL also has ability to concatenate multiple user records into one Kinesis Record (Aggregation). The two functionalities allow KPL to perform the Kinesis writing more efficiently.
Other benefits which make more sense for long running applications are that KPL process is separate from the application process (Link).
If the Lambda was processing one record per invocation, then KPL would be redundant and we could directly use the PutRecord API instead.
I also created AmazonCloudWatch instance - to log custom metrics about the publishing.
With that discussion out of the way, lets look at the actual code inside the Lambda:
The code reads a batch of records from S3 file, waits on successful publish to Kinesis Stream before trying to write next batch. This is blocking technique being used to ensure no records are lost. We can also write to Kinesis in a fire and forget style.
I setup a Kinesis Stream with 1 shard. Also I added Kinesis permissions to my Lambda execution role.
I looked at the data being written to Kinesis:
As seen the number of UserRecords is far more than the number of Kinesis Records. This is because KPL provides aggregation, writing several of my records as one Kinesis Record
Also as the file size increased my lambda timed out
1 2 | 2020-04-13T22:03:36.210Z ac9afab8-1115-4d6e-a491-ded9d756055f Task timed out after 900.10 seconds REPORT RequestId: ac9afab8-1115-4d6e-a491-ded9d756055f Duration: 900096.51 ms Billed Duration: 900000 ms Memory Size: 512 MB Max Memory Used: 218 MB |
I changed the aggregation pattern to try and get throttle errors: Added this to constructor
1 | configuration.setAggregationEnabled(false); |
Notice how the graph has changed. The number of user records put fell, as I wasn't using aggregation anymore. Also the KinesisRecords being added went up during this period (Collection still in use, hence the numbers not identical).
As my Lambda has no inbuilt throttling, it attempts to push as many records as possible. However to prevent record loss, it processes the response waiting for a successful ack from Kinesis before attempting the next big push.
I also started observing throttle errors in Kinesis:
Interestingly my custom Metrics which indicated if the API failed to push any record to cloud watch is always Zero. KPL managed through retries to ensure that all my records made it.
Also my Lambda was able to download 1Gb files from S3 and run. Per Lambda page the /tmp directory storage is limited to 512 MB (which is where I assumed this file is being downloaded) but that is not the case.
Wonderful post! Also visit here: Feeta.pk - apartments for rent in islamabad . Keep up the great writing.
ReplyDelete