Home>>Data Science>>How to Query Athena from a Spring Boot application?
amazon athena
Data ScienceTech

How to Query Athena from a Spring Boot application?

In the last post, we saw how to query data from S3 using Amazon Athena in the AWS Console. But querying from the Console itself if very limited. We can’t really do much with the data, and anytime we want to analyse this data, we can’t really sit in front of the console the whole day and run queries manually. We need to automate the process. And what better way to do that than writing a piece of code? So in this post, we’ll see how we can use the AWS Java SDK in a Spring Boot application and query the same sample data set from the previous post. We’ll then log it to the console to make sure we’re getting the right data.


The Dependencies

Before we get to the code, let’s first get our dependencies right. I did the painstaking task of finding the right dependencies for this POC. All of them are available on Maven Central, so we don’t even have to deal with third party repositories. The complete list of dependencies is here:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>auth</artifactId>
    <version>2.8.3</version>
</dependency>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>athena</artifactId>
    <version>2.8.3</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.627</version>
</dependency>

The Configuration

The obvious next section is the configuration. There are a few pieces of configuration that we need to do before we can get started with the code.

First, in the App.java, we have two constants that we need to change:

private static final String ATHENA_DATABASE = "athenatest";
private static final String ATHENA_OUTPUT_S3_FOLDER_PATH = "s3://athena-poc-contactsunny/";

Change these values so that you don’t use the hypothetical values I’ve given here. Next, in the AthenaClientFactory.java class, change the region to reflect your AWS setup. Otherwise, the integration will not work.

Next, we have to configure the credentials. In this example, I’m using the EnvironmentVariableCredentialsProvider credential provide for the AWS SDK. So we have to save our credentials in the environment. Set the following environment variables, but change the values, of course:

AWS_ACCESS_KEY_ID=abcdefg 
AWS_SECRET_ACCESS_KEY=hijklmn

That’s all the configuration we need.

The Code

The fun part begins now! First, let’s create the AthenaClientFactory class which will be responsible for providing us with an instance of the AthenaClient class. This object will be our SDK client. The code for this is fairly simple:

class AthenaClientFactory {

    private final AthenaClientBuilder builder = AthenaClient.builder()
            .region(Region.US_EAST_2)
            .credentialsProvider(EnvironmentVariableCredentialsProvider.create());

    AthenaClient createClient() {
        return builder.build();
    }
}

We’ll forget about this for now. Moving on, in our main() method, let’s create an AthenaClient using the factory we just created:

AthenaClientFactory factory = new AthenaClientFactory();
AthenaClient athenaClient = factory.createClient();

For this example, we’ll use a very simple select * query. We’ll use the AthenaClient we just created:

private static String submitAthenaQuery(AthenaClient athenaClient) {

    private static final String SIMPLE_ATHENA_QUERY = "select * from sampledata limit 10;";
    
    QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
            .database(ATHENA_DATABASE).build();

    ResultConfiguration resultConfiguration = ResultConfiguration.builder()
            .outputLocation(ATHENA_OUTPUT_S3_FOLDER_PATH).build();

    StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
            .queryString(SIMPLE_ATHENA_QUERY)
            .queryExecutionContext(queryExecutionContext)
            .resultConfiguration(resultConfiguration).build();

    StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);

    return startQueryExecutionResponse.queryExecutionId();
}

As you can see from the code snippet, the method returns an execution ID, which we can use to track the progress of the query operation. The query operation can either succeed, or be cancelled, or fail. We have to monitor the operation until it reaches one of these states. We can do that using the method below:

private static void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException {

    GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
            .queryExecutionId(queryExecutionId).build();

    GetQueryExecutionResponse getQueryExecutionResponse;

    boolean isQueryStillRunning = true;

    while (isQueryStillRunning) {
        getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
        String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();

        if (queryState.equals(QueryExecutionState.FAILED.toString())) {
            throw new RuntimeException("Query Failed to run with Error Message: " + getQueryExecutionResponse
                    .queryExecution().status().stateChangeReason());
        } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
            throw new RuntimeException("Query was cancelled.");
        } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
            isQueryStillRunning = false;
        } else {
            Thread.sleep(SLEEP_AMOUNT_IN_MS);
        }

        logger.info("Current Status is: " + queryState);
    }
}

Hoping that our query succeeded and didn’t get cancelled or fail, we’ll move on to see how we can actually use the data returned by the query operation. As an example, we’ll just log each row we got to the console:

private static void processResultRows(AthenaClient athenaClient, String queryExecutionId) {

    GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
            .queryExecutionId(queryExecutionId).build();

    GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);

    for (GetQueryResultsResponse Resultresult : getQueryResultsResults) {
        List<ColumnInfo> columnInfoList = Resultresult.resultSet().resultSetMetadata().columnInfo();

        int resultSize = Resultresult.resultSet().rows().size();
        logger.info("Result size: " + resultSize);

        List<Row> results = Resultresult.resultSet().rows();
        processRow(results, columnInfoList);
    }
}

private static void processRow(List<Row> rowList, List<ColumnInfo> columnInfoList) {

    List<String> columns = new ArrayList<>();

    for (ColumnInfo columnInfo : columnInfoList) {
        columns.add(columnInfo.name());
    }

    for (Row row: rowList) {
        int index = 0;

        for (Datum datum : row.data()) {
            logger.info(columns.get(index) + ": " + datum.varCharValue());
            index++;
        }

        logger.info("===================================");
    }
}

And that’s it. We are now able to query data which is present in S3 using a table configured in Amazon Athena using a Java program we just wrote. This could be used to run complex aggregation or filter queries and to fetch only a subset of the data, maybe for further processing in something like Apache Spark.

If you’re interested in looking at the complete Spring Boot project, head over to my Github repository.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: