Receiving messages from Amazon SQS in a Spring Boot applicationTech by Sunny Srinidhi - January 16, 2020January 24, 20200 In this post, we’ll see how we can receive messages from an Amazon SQS queue in a Spring Boot application. This is a continuation of the previous post where we talked about how we can send messages to an SQS queue. The obvious next part of that is how do we receive those messages. So in this post, we’ll do just that.If you don’t have an Amazon SQS queue created already, checkout the previous post on how to do it. Here, I’ll assume that you already have that pipeline setup. So I’m going to skip that part of the post. We’ll jump right into the code. The CodeThe first thing we need to add in our Spring Boot application is the Amazon SQS dependency. Fortunately, the dependency is available in the Maven central repository. So we just add the following in the dependencies block in our pom.xml file:<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-aws-messaging</artifactId> <version>2.2.1.RELEASE</version> </dependency>Next, we have to take care of the properties. For this example, we’ll use the static credential provider for AWS. So we’ll need the access key and the secret key. I’ll assume that you already have these things. So I’ll skip instructions on how to set it up. In our application.properties file, we’ll have the following properties:sqs.url=https://sqs.us-east-2.amazonaws.com/1234/abcd aws.accessKey= aws.secretKey= aws.region=Fill in the blanks in the properties file. We have to get these values in our Java code. Because we’re using Spring Boot, it’s very easy to get these values into our code, using the @Value annotation:@Value("${sqs.url}") private String sqsUrl; @Value("${aws.accessKey}") private String awsAccessKey; @Value("${aws.secretKey}") private String awsSecretKey; @Value("${aws.region}") private String awsRegion;After this, we have to create an object for AWS credentials and one for Amazon SQS. We can do that using the code snippet below: AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider( new BasicAWSCredentials(awsAccessKey, awsSecretKey) ); AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard().withCredentials(awsCredentialsProvider).build();Now we have to start reading messages. We do this in a while(true) {} loop. This is because we don’t want to stop listening for new message. We have to keep doing this continuously. But before we get into the loop, we have to create a request object to periodically request messages from SQS. In this request, we can specify how many messages we need in each response, and what is the time separation between consecutive requests. This request object is an instance of the ReceiveMessageRequest class, and this is how we create it:final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsUrl) .withMaxNumberOfMessages(1) .withWaitTimeSeconds(3);Now we get into the loop. In the loop, we use the SQS client instance we created earlier to request messages. Once we do that, we get a list of messages in return. We can then move on to processing each message. The loop looks like this:while (true) { final List<Message> messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages(); for (Message messageObject : messages) { String message = messageObject.getBody(); logger.info("Received message: " + message); } }I’m keeping the processing simple here, just printing out the message body.Having multiple message consumers for a single Amazon SQS queueIf you’re coming from the Apache Kafka realm, you’ll know that we can have multiple consumer for a given topic with the same group ID to parallelise the processing of messages in a topic. The obvious question here is, can we do the same with Amazon SQS?The simple answer to that is yes, we can. However, there is no concept of group ID here, or anything similar to that. If you have more than one service listening to the same SQS queue, the messages coming into that queue will be distributed across all such services. No two services will get the same message. If you want to emulate Kafka’s consumers with multiple group IDs for the same topic, you need to use a combination of SNS (Amazon’s Simple Notification Service), and SQS. Maybe I’ll write another post about how to do that if you guys are interested.Acknowledging a message in SQSIn Apache Kafka, once we consume a message from a topic, and process that message, we acknowledge that message by something something called committing the offset. Once you do that, this message will not be delivered to you the next time you ask for a message. If you commit the offset, it means that you’re done with that message, and you’re telling the system not to send it to you again. Is it possible to do the same thing here? The answer to this again is yes. There’s something called visibility timeout in the SQS world. Visibility timeout is the amount of time up to which a message will be visible to any consumer. So if you query for messages multiple times within the visibility timeout period, chances are, you’ll get the same messages. So the “committing offset” equivalent in SQS is to delete the message from the queue. In the for loop that we saw earlier in the code snippet, we can delete a message after reading it. I wrote a simple method for that:private void deleteMessage(Message messageObject) { final String messageReceiptHandle = messageObject.getReceiptHandle(); amazonSQS.deleteMessage(new DeleteMessageRequest(sqsUrl, messageReceiptHandle)); }We’ll change our loop in the previous code snippet to call this method once we process a message:while (true) { final List<Message> messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages(); for (Message messageObject : messages) { String message = messageObject.getBody(); logger.info("Received message: " + message); deleteMessage(messageObject); } }I have written the SQS related code is a class called SQSUtil. This class is annotated as a @Component, so that I can @Autowire it anywhere in the code base. The class looks like this:@Component public class SQSUtil { @Value("${sqs.url}") private String sqsUrl; @Value("${aws.accessKey}") private String awsAccessKey; @Value("${aws.secretKey}") private String awsSecretKey; @Value("${aws.region}") private String awsRegion; private AmazonSQS amazonSQS; private static final Logger logger = LoggerFactory.getLogger(SQSUtil.class); @PostConstruct private void postConstructor() { logger.info("SQS URL: " + sqsUrl); AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider( new BasicAWSCredentials(awsAccessKey, awsSecretKey) ); this.amazonSQS = AmazonSQSClientBuilder.standard().withCredentials(awsCredentialsProvider).build(); } public void startListeningToMessages() { final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsUrl) .withMaxNumberOfMessages(1) .withWaitTimeSeconds(3); while (true) { final List<Message> messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages(); for (Message messageObject : messages) { String message = messageObject.getBody(); logger.info("Received message: " + message); deleteMessage(messageObject); } } } private void deleteMessage(Message messageObject) { final String messageReceiptHandle = messageObject.getReceiptHandle(); amazonSQS.deleteMessage(new DeleteMessageRequest(sqsUrl, messageReceiptHandle)); } }If you’re curious and want to see how different the code is for an Apache Kafka producer and consumer, check out this post, where I talk about the same. You can get the complete working project for this post on my Githup repo.And if you like what you see here, or on my Medium blog, and would like to see more of such helpful technical posts in the future, consider supporting me on Patreon. Become a Patron! Share this:ShareTwitterFacebookPrintEmailLinkedInRedditPinterestPocketTelegramThreadsWhatsAppMastodonNextdoorXLike this:Like Loading...Related