How to Query Athena from a Spring Boot application?Data ScienceTech by Sunny Srinidhi - September 25, 2019March 3, 20203 In the last post, we saw how to query data from S3 using Amazon Athena in the AWS Console. But querying from the Console itself if very limited. We can’t really do much with the data, and anytime we want to analyse this data, we can’t really sit in front of the console the whole day and run queries manually. We need to automate the process. And what better way to do that than writing a piece of code? So in this post, we’ll see how we can use the AWS Java SDK in a Spring Boot application and query the same sample data set from the previous post. We’ll then log it to the console to make sure we’re getting the right data.The DependenciesBefore we get to the code, let’s first get our dependencies right. I did the painstaking task of finding the right dependencies for this POC. All of them are available on Maven Central, so we don’t even have to deal with third party repositories. The complete list of dependencies is here: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>auth</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>athena</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.11.627</version> </dependency>The ConfigurationThe obvious next section is the configuration. There are a few pieces of configuration that we need to do before we can get started with the code.First, in the App.java, we have two constants that we need to change:private static final String ATHENA_DATABASE = "athenatest"; private static final String ATHENA_OUTPUT_S3_FOLDER_PATH = "s3://athena-poc-contactsunny/";Change these values so that you don’t use the hypothetical values I’ve given here. Next, in the AthenaClientFactory.java class, change the region to reflect your AWS setup. Otherwise, the integration will not work.Next, we have to configure the credentials. In this example, I’m using the EnvironmentVariableCredentialsProvider credential provide for the AWS SDK. So we have to save our credentials in the environment. Set the following environment variables, but change the values, of course: AWS_ACCESS_KEY_ID=abcdefg AWS_SECRET_ACCESS_KEY=hijklmnThat’s all the configuration we need.The CodeThe fun part begins now! First, let’s create the AthenaClientFactory class which will be responsible for providing us with an instance of the AthenaClient class. This object will be our SDK client. The code for this is fairly simple:class AthenaClientFactory { private final AthenaClientBuilder builder = AthenaClient.builder() .region(Region.US_EAST_2) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); AthenaClient createClient() { return builder.build(); } }We’ll forget about this for now. Moving on, in our main() method, let’s create an AthenaClient using the factory we just created:AthenaClientFactory factory = new AthenaClientFactory(); AthenaClient athenaClient = factory.createClient();For this example, we’ll use a very simple select * query. We’ll use the AthenaClient we just created:private static String submitAthenaQuery(AthenaClient athenaClient) { private static final String SIMPLE_ATHENA_QUERY = "select * from sampledata limit 10;"; QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder() .database(ATHENA_DATABASE).build(); ResultConfiguration resultConfiguration = ResultConfiguration.builder() .outputLocation(ATHENA_OUTPUT_S3_FOLDER_PATH).build(); StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder() .queryString(SIMPLE_ATHENA_QUERY) .queryExecutionContext(queryExecutionContext) .resultConfiguration(resultConfiguration).build(); StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest); return startQueryExecutionResponse.queryExecutionId(); }As you can see from the code snippet, the method returns an execution ID, which we can use to track the progress of the query operation. The query operation can either succeed, or be cancelled, or fail. We have to monitor the operation until it reaches one of these states. We can do that using the method below:private static void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException { GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder() .queryExecutionId(queryExecutionId).build(); GetQueryExecutionResponse getQueryExecutionResponse; boolean isQueryStillRunning = true; while (isQueryStillRunning) { getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest); String queryState = getQueryExecutionResponse.queryExecution().status().state().toString(); if (queryState.equals(QueryExecutionState.FAILED.toString())) { throw new RuntimeException("Query Failed to run with Error Message: " + getQueryExecutionResponse .queryExecution().status().stateChangeReason()); } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) { throw new RuntimeException("Query was cancelled."); } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) { isQueryStillRunning = false; } else { Thread.sleep(SLEEP_AMOUNT_IN_MS); } logger.info("Current Status is: " + queryState); } }Hoping that our query succeeded and didn’t get cancelled or fail, we’ll move on to see how we can actually use the data returned by the query operation. As an example, we’ll just log each row we got to the console: private static void processResultRows(AthenaClient athenaClient, String queryExecutionId) { GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder() .queryExecutionId(queryExecutionId).build(); GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest); for (GetQueryResultsResponse Resultresult : getQueryResultsResults) { List<ColumnInfo> columnInfoList = Resultresult.resultSet().resultSetMetadata().columnInfo(); int resultSize = Resultresult.resultSet().rows().size(); logger.info("Result size: " + resultSize); List<Row> results = Resultresult.resultSet().rows(); processRow(results, columnInfoList); } } private static void processRow(List<Row> rowList, List<ColumnInfo> columnInfoList) { List<String> columns = new ArrayList<>(); for (ColumnInfo columnInfo : columnInfoList) { columns.add(columnInfo.name()); } for (Row row: rowList) { int index = 0; for (Datum datum : row.data()) { logger.info(columns.get(index) + ": " + datum.varCharValue()); index++; } logger.info("==================================="); } }And that’s it. We are now able to query data which is present in S3 using a table configured in Amazon Athena using a Java program we just wrote. This could be used to run complex aggregation or filter queries and to fetch only a subset of the data, maybe for further processing in something like Apache Spark.If you’re interested in looking at the complete Spring Boot project, head over to my Github repository.Share this:ShareTwitterFacebookPrintEmailLinkedInRedditPinterestPocketTelegramThreadsWhatsAppMastodonNextdoorXLike this:Like Loading...Related
Thanks Sunny, Can I have your opinion on something? My team is now considering pivoting from Postgres to Athena for our entire read model for two microservices that serve data for frontend user-facing applications. That’s multiple concurrent users clicking on various dashboards and graphs in real-time. I am concerned about performance and concurrency here. What do you think?Reply
I wouldn’t use Athena for powering any real time APIs. The latency could become a deal breaker as the load increases. I would reconsider this.Reply
Hi Sunny Could you please help me how can i download 40GB+ data from s3 athena in java.As it was showing this error2024-06-25 18:16:25.447 WARN 509917 — [pool-6-thread-1] c.a.s.s.internal.S3AbortableInputStream : Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.private static int saveResultFile(S3Object s3Object, String outputFile, List columnInfoList) { System.out.println(“This is the save file method.”); System.out.println(outputFile + “this is output”); String lastTwoDigits = outputFile.substring(outputFile.length() – 2); boolean hasData = false; // Initialize to false int totalSum = 0; // Initialize totalSum before the loop // Set the maximum records per page int maxRecordsPerPage = 1000000; try (BufferedReader reader = new BufferedReader( new InputStreamReader(s3Object.getObjectContent(), StandardCharsets.UTF_8)); ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(outputFile + “.zip”))) { CSVReader csvReader = new CSVReader(reader); String[] header = csvReader.readNext(); if (header == null) { // No header, return false System.out.println(“No header found in the CSV file.”); return NO_HEADERS_FOUND; } int recordsProcessed = 0; int pageNumber = 1; CSVWriter writer = null; try { // Create a ZipOutputStream to write to the zip file writer = new CSVWriter(new FileWriter(outputFile + “_” + pageNumber + “.csv”), ‘,’, CSVWriter.DEFAULT_QUOTE_CHARACTER); // Write the header for the current page writer.writeNext(header); String[] line; while ((line = csvReader.readNext()) != null) { hasData = true; // Set to true if any data is processed if (lastTwoDigits.equals(“02”)) { // Calculate sum for sms_split column if (line.length > 8) { // Assuming index 7 is the 8th column try { totalSum += Integer.parseInt(line[8]); // Assuming sms_split is an integer } catch (NumberFormatException e) { // Handle parsing error if necessary e.printStackTrace(); } } } int lastColumnIndex = line.length – 1; String lastColumnName = header[lastColumnIndex]; if (isTextColumn(lastColumnName)) { // Perform encryption and decryption operations only if it’s a text column String encryptedValue = line[lastColumnIndex]; String decryptedText = decrypt(encryptedValue); if (isHexString(decryptedText)) { decryptedText = hexToAscii(decryptedText); } line[lastColumnIndex] = decryptedText; } if (recordsProcessed >= maxRecordsPerPage) { // Close the current writer and create a new one for the next page writer.close(); pageNumber++; recordsProcessed = 0; writer = new CSVWriter(new FileWriter(outputFile + “_” + pageNumber + “.csv”), ‘,’, CSVWriter.DEFAULT_QUOTE_CHARACTER); // Write the header for the new page writer.writeNext(header); } writer.writeNext(line); recordsProcessed++; } } catch (IOException e) { e.printStackTrace(); System.out.println(“Error creating ZIP file: ” + outputFile + “.zip”); } finally { if (writer != null) { writer.close(); } } if (hasData) { for (int i = 1; i = 0) { zipOutputStream.write(bytes, 0, length); } zipOutputStream.closeEntry(); if (new File(pageFileName).delete()) { // System.out.println(“Deleted file: ” + pageFileName); } else { System.out.println(“Failed to delete file: ” + pageFileName); } } catch (IOException e) { e.printStackTrace(); System.out.println(“Error processing file: ” + pageFileName); } } System.out.println(“Result files saved successfully with decryption.”); System.out.println(“Total sum of sms_split column: ” + totalSum); } else { System.out.println(“No data found to save.”); return NO_HEADERS_FOUND; } } catch (IOException e) { e.printStackTrace(); } finally { try { if (s3Object != null) { s3Object.close(); }“your text“ } catch (IOException e) { e.printStackTrace(); } } return totalSum; }//this is my code snappetReply