Search This Blog

Thursday, 4 June 2020

Athena through API

In the last post, we setup tables and database for our s3 data on Athena and queries it through the AWS console. In this post, I will attempt the same thing through Java SDK
The post follows Amazon sample code given in AWS Docs here.
To connect to Athena from Java, we use the Athena Client.
    private static AthenaClient athenaClient;

    public static void main(String[] args) throws InterruptedException {
        final AthenaClientBuilder builder = AthenaClient.builder()
                .region(Region.US_EAST_1)
                .credentialsProvider(
                        StaticCredentialsProvider.create(
                                AwsBasicCredentials.create(
                                        "AccessKey",
                                        "SecretKey")));

        athenaClient = builder.build();
        String queryExecutionId = startQueryExecution();
        completeQueryExecution(queryExecutionId);
        processQueryResult(queryExecutionId);
    }
}
The method shows the three steps to be performed -
  1. Start a query execution on Athena.
  2. Wait for execution to complete
  3. Process the results of the query
The first part was to execute the query. I am using the same dataset from the previous post:
    private static String startQueryExecution() {
        //The database and data catalog context in which the query execution occurs.
        QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
                .database("users_db")
                .build();

        // The result configuration specifies where the results of the query
        // should go in S3 and encryption options
        ResultConfiguration resultConfiguration = ResultConfiguration.builder()
                // You can provide encryption options for the output that is written.
                // .withEncryptionConfiguration(encryptionConfiguration)
                .outputLocation("s3://athena-query-output-locn")
                .build();

        // Create the StartQueryExecutionRequest to send to Athena which will start the query.
        StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
                .queryString("SELECT * from users_partitioned LIMIT 5;")
                .queryExecutionContext(queryExecutionContext)
                .resultConfiguration(resultConfiguration)
                .build();

        //Start executing the query
        StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
        return startQueryExecutionResponse.queryExecutionId();
    }
The first step was to setup the QueryExecutionContext which includes the name of the database I setup perviously.
Next was creating the ResultConfiguration which indicates where the output of the query is to be stored in S3. Athena wrote the query result as csv files to that location.
The last step was to create StartQueryExecutionRequest instance and execute the query. AthenaClient returns a query Execution id that can be used to track the query execution.

Now we wait for the query to complete execution.
    private static void completeQueryExecution(String queryExecutionId) 
              throws InterruptedException {
        GetQueryExecutionRequest getQueryExecutionRequest = 
                      GetQueryExecutionRequest.builder()
                          .queryExecutionId(queryExecutionId).build();
        GetQueryExecutionResponse getQueryExecutionResponse;
        boolean isQueryStillRunning = true;
        int i = 0;
        while (isQueryStillRunning) {
            System.out.println("Checking if query is running. Check No " + i);
            getQueryExecutionResponse = athenaClient.getQueryExecution(
                                getQueryExecutionRequest);
            String queryState = getQueryExecutionResponse.queryExecution()
                         .status().state().toString();
            if (queryState.equals(QueryExecutionState.FAILED.toString())) {
                throw new RuntimeException("Query Failed to run with Error " + 
                                 "Message: " + getQueryExecutionResponse
                        .queryExecution().status().stateChangeReason());
            } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
                throw new RuntimeException("Query was cancelled.");
            } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
                isQueryStillRunning = false;
            } else {
                // Sleep an amount of time before retrying again.
                Thread.sleep(1000);
            }
            i++;
            System.out.println("Current Status is: " + queryState);
        }
    }
The code uses the query execution id returned in the StartQueryExecutionResponse to check the status of query execution. The code simply sleeps until it receives one of FAILED, CANCELLED or SUCCEEDED status.
The last step is to process the response from query execution:
    private static void processQueryResult(String queryExecutionId) {
        GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
                // Max Results can be set but if its not set,
                // it will choose the maximum page size
                // As of the writing of this code, the maximum value is 1000
                // .withMaxResults(1000)
                .queryExecutionId(queryExecutionId).build();

        GetQueryResultsIterable getQueryResultsResults = athenaClient
                .getQueryResultsPaginator(getQueryResultsRequest);

        for (GetQueryResultsResponse result : getQueryResultsResults) {
            List<ColumnInfo> columnInfoList = result.resultSet()
                                 .resultSetMetadata().columnInfo();
            System.out.println("ColumnInfo received is " + columnInfoList);
            List<Row> results = result.resultSet().rows();
            processRow(results);
        }
    }

    private static void processRow(List<Row> row) {

        //Write out the data
        for (Row myRow : row) {
            List<Datum> allData = myRow.data();
            System.out.println();
            for (Datum data : allData) {
                System.out.print("  " + data.varCharValue());
            }
        }
    }
The code create an instance of GetQueryResultsRequest with the queryExecutionId from the StartQueryExecutionResponse. The response is GetQueryResultsIterable
This class is an iterable of GetQueryResultsResponse that can be used to iterate 
through all the response pages of the operation.
When the operation is called, an instance of this class is returned. At this 
point, no service calls are made yet and so there is no guarantee that the 
request is valid. As you iterate through the iterable, SDK will start lazily 
loading response pages by making service calls until there are no pages left or 
your iteration stops. If there are errors in your request, you will see the failures
 only after you start iterating through the iterable.
The result of the query is placed by Athena in the S3 folder. Each page of the resultSet includes a list of ColumnInfo and a list of Row.
ColumnInfo has Information about the columns in a query execution result, while Row represents each record from the result.

I executed my code and the output is as below:
Checking if query is running. Check No 0
Current Status is: QUEUED
Checking if query is running. Check No 1
Current Status is: RUNNING
Checking if query is running. Check No 2
Current Status is: RUNNING
Checking if query is running. Check No 3
Current Status is: SUCCEEDED
ColumnInfo received is [
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, 
Name=user_id, Label=user_id, Type=integer, Precision=10, Scale=0, 
Nullable=UNKNOWN, CaseSensitive=false), 
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=name, Label=name, 
Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=phone_no, 
Label=phone_no, Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, 
CaseSensitive=true), 
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, 
Name=age, Label=age, Type=integer, Precision=10, Scale=0, Nullable=UNKNOWN, 
CaseSensitive=false), 
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=hobbies, Label=hobbies,
 Type=array, Precision=0, Scale=0, Nullable=UNKNOWN, CaseSensitive=false), 
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=state, Label=state, 
Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=country, Label=country,
Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN, CaseSensitive=true),
ColumnInfo(CatalogName=hive, SchemaName=, TableName=, Name=dataset_date, 
Label=dataset_date, Type=varchar, Precision=2147483647, Scale=0, Nullable=UNKNOWN,
CaseSensitive=true)]

  user_id  name  phone_no  age  hobbies  state  country  dataset_date
  4358786  UserName_4358786  phNo4358786  48  []  OR  US  2020-05-28
  4358787  UserName_4358787  phNo4358787  54  []  TE  US  2020-05-28
  4358788  UserName_4358788  phNo4358788  27  [piano, singing, painting, video games]  CA  US  2020-05-28
  4358789  UserName_4358789  phNo4358789  34  []  OK  US  2020-05-28
  4358790  UserName_4358790  phNo4358790  25  [singing]  NM  US  2020-05-28
Process finished with exit code 0
The S3 bucket had two files - one a csv with the 5 rows, the other a metadata file which had information about the columns

1 comment:

  1. Exceptional work! I'm going to read more of your posts after reading this article. Also visit here: Feeta.pk house for sale in lahore low price . Maintain your good work.

    ReplyDelete