In the last post, we setup tables and database for our s3 data on Athena and queries it through the AWS console. In this post, I will attempt the same thing through Java SDK
The post follows Amazon sample code given in AWS Docs here.
To connect to Athena from Java, we use the Athena Client.
The method shows the three steps to be performed -
The first step was to setup the QueryExecutionContext which includes the name of the database I setup perviously.
Next was creating the ResultConfiguration which indicates where the output of the query is to be stored in S3. Athena wrote the query result as csv files to that location.
The last step was to create StartQueryExecutionRequest instance and execute the query. AthenaClient returns a query Execution id that can be used to track the query execution.
Now we wait for the query to complete execution.
The code uses the query execution id returned in the StartQueryExecutionResponse to check the status of query execution. The code simply sleeps until it receives one of FAILED, CANCELLED or SUCCEEDED status.
The last step is to process the response from query execution:
The code create an instance of GetQueryResultsRequest with the queryExecutionId from the StartQueryExecutionResponse. The response is GetQueryResultsIterable
The result of the query is placed by Athena in the S3 folder. Each page of the resultSet includes a list of ColumnInfo and a list of Row.
ColumnInfo has Information about the columns in a query execution result, while Row represents each record from the result.
I executed my code and the output is as below:
The S3 bucket had two files - one a csv with the 5 rows, the other a metadata file which had information about the columns
The post follows Amazon sample code given in AWS Docs here.
To connect to Athena from Java, we use the Athena Client.
private static AthenaClient athenaClient; public static void main(String[] args) throws InterruptedException { final AthenaClientBuilder builder = AthenaClient.builder() .region(Region.US_EAST_1) .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create( "AccessKey", "SecretKey"))); athenaClient = builder.build(); String queryExecutionId = startQueryExecution(); completeQueryExecution(queryExecutionId); processQueryResult(queryExecutionId); } }
- Start a query execution on Athena.
- Wait for execution to complete
- Process the results of the query
private static String startQueryExecution() { //The database and data catalog context in which the query execution occurs. QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder() .database("users_db") .build(); // The result configuration specifies where the results of the query // should go in S3 and encryption options ResultConfiguration resultConfiguration = ResultConfiguration.builder() // You can provide encryption options for the output that is written. // .withEncryptionConfiguration(encryptionConfiguration) .outputLocation("s3://athena-query-output-locn") .build(); // Create the StartQueryExecutionRequest to send to Athena which will start the query. StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder() .queryString("SELECT * from users_partitioned LIMIT 5;") .queryExecutionContext(queryExecutionContext) .resultConfiguration(resultConfiguration) .build(); //Start executing the query StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest); return startQueryExecutionResponse.queryExecutionId(); }
Next was creating the ResultConfiguration which indicates where the output of the query is to be stored in S3. Athena wrote the query result as csv files to that location.
The last step was to create StartQueryExecutionRequest instance and execute the query. AthenaClient returns a query Execution id that can be used to track the query execution.
Now we wait for the query to complete execution.
private static void completeQueryExecution(String queryExecutionId)
throws InterruptedException { GetQueryExecutionRequest getQueryExecutionRequest =
GetQueryExecutionRequest.builder() .queryExecutionId(queryExecutionId).build(); GetQueryExecutionResponse getQueryExecutionResponse; boolean isQueryStillRunning = true; int i = 0; while (isQueryStillRunning) { System.out.println("Checking if query is running. Check No " + i); getQueryExecutionResponse = athenaClient.getQueryExecution(
getQueryExecutionRequest); String queryState = getQueryExecutionResponse.queryExecution()
.status().state().toString(); if (queryState.equals(QueryExecutionState.FAILED.toString())) { throw new RuntimeException("Query Failed to run with Error " +
"Message: " + getQueryExecutionResponse .queryExecution().status().stateChangeReason()); } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) { throw new RuntimeException("Query was cancelled."); } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) { isQueryStillRunning = false; } else { // Sleep an amount of time before retrying again. Thread.sleep(1000); } i++; System.out.println("Current Status is: " + queryState); } }
The last step is to process the response from query execution:
private static void processQueryResult(String queryExecutionId) { GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder() // Max Results can be set but if its not set, // it will choose the maximum page size // As of the writing of this code, the maximum value is 1000 // .withMaxResults(1000) .queryExecutionId(queryExecutionId).build(); GetQueryResultsIterable getQueryResultsResults = athenaClient .getQueryResultsPaginator(getQueryResultsRequest); for (GetQueryResultsResponse result : getQueryResultsResults) { List<ColumnInfo> columnInfoList = result.resultSet()
.resultSetMetadata().columnInfo(); System.out.println("ColumnInfo received is " + columnInfoList); List<Row> results = result.resultSet().rows(); processRow(results); } } private static void processRow(List<Row> row) { //Write out the data for (Row myRow : row) { List<Datum> allData = myRow.data(); System.out.println(); for (Datum data : allData) { System.out.print(" " + data.varCharValue()); } } }
This class is an iterable of GetQueryResultsResponse that can be used to iterate through all the response pages of the operation. When the operation is called, an instance of this class is returned. At this point, no service calls are made yet and so there is no guarantee that the request is valid. As you iterate through the iterable, SDK will start lazily loading response pages by making service calls until there are no pages left or your iteration stops. If there are errors in your request, you will see the failures only after you start iterating through the iterable.
ColumnInfo has Information about the columns in a query execution result, while Row represents each record from the result.
I executed my code and the output is as below:
Checking if query is running. Check No 0 Current Status is: QUEUED Checking if query is running. Check No 1 Current Status is: RUNNING Checking if query is running. Check No 2 Current Status is: RUNNING Checking if query is running. Check No 3 Current Status is: SUCCEEDED ColumnInfo received is [
ColumnInfo(CatalogName=hive, SchemaName=, TableName=,
Name=user_id, Label=user_id, Type=integer, Precision=10, Scale=0,
Nullable=UNKNOWN, CaseSensitive=false),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=name, Label=name,
Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=phone_no,
Label=phone_no, Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN,
CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=,
Name=age, Label=age, Type=integer, Precision=10, Scale=0, Nullable=UNKNOWN,
CaseSensitive=false),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=hobbies, Label=hobbies,
Type=array, Precision=0, Scale=0, Nullable=UNKNOWN, CaseSensitive=false),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=state, Label=state,
Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=country, Label=country,
Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=dataset_date,
Label=dataset_date, Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN,
CaseSensitive=true)] user_id name phone_no age hobbies state country dataset_date 4358786 UserName_4358786 phNo4358786 48 [] OR US 2020-05-28 4358787 UserName_4358787 phNo4358787 54 [] TE US 2020-05-28 4358788 UserName_4358788 phNo4358788 27 [piano, singing, painting, video games] CA US 2020-05-28 4358789 UserName_4358789 phNo4358789 34 [] OK US 2020-05-28 4358790 UserName_4358790 phNo4358790 25 [singing] NM US 2020-05-28 Process finished with exit code 0
Exceptional work! I'm going to read more of your posts after reading this article. Also visit here: Feeta.pk - house for sale in lahore low price . Maintain your good work.
ReplyDelete