1
/
5

Consuming from Amazon SQS with Spring Cloud AWS

Introduction

Event-driven architecture is one of the hottest topics in the software development community nowadays. Schedule and batch-based approaches to data processing are imposing a number of limitations. In case your system is deeply integrated with AWS services, sometimes you are limited to Amazon SQS as a message queuing service. For example, when you need to trigger some processing on uploads to the Amazon S3 bucket. One of the few native options is to automatically deliver such notifications to SQS.

In this article, we will set up a consumer of messages from an SQS queue using Spring Boot and Spring Cloud AWS.

Amazon SQS

Amazon SQS stands for Simple Queue Service and it is a general-purpose messaging service with client libraries for various languages and frameworks. Before getting to the consumer implementation. Let’s review some concepts of the Amazon SQS.

Visibility

One of the most important concepts is message visibility. When a message is produced to the SQS queue, it becomes visible to all consumers that are subscribed to the queue. Only one consumer can take a specific message and when it does, the message is not deleted from the queue but marked as invisible. This is done for two reasons:

  • So no other consumer will be able to get the message that is being processed.
  • To make sure that if the current consumer fails in the middle of processing, the message will not be lost and will become visible after the visibility timeout passes.

The visibility timeout value is set on the queue level (default is 30 seconds) but can be overridden for each message by the consumer. After the visibility timeout elapses, the message becomes available for any free consumer to get again.

The consumer can freely change the visibility timeout of the message it acquired, as long as the total time it is being invisible is less than a maximum of 12 hours. For more details, see the official documentation.

Dead Letter Queue

Every time the same message is consumed, the special counter called ReceiveCount is increased. If a message cannot be processed successfully - it will be retried forever. To avoid this, Amazon SQS introduces the concept of the DLQ - Dead Letter Queues. One can set up DLQ for an SQS queue with the “maxReceiveCount” parameter, which defines, how many receives indicate that something is wrong with the message and it should be put aside, to DLQ.

What to do with messages in DLQ is up to the developer and is very domain-specific. One might manually investigate reasons for failure or just set up alerts on the rate of messages produced to the DLQ.

Introduction to Spring Cloud AWS

If you plan to work with Amazon SQS from your JVM application, you have several technologies to choose from. Amazon provides low-level SDK for calling various SQS APIs - https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs.html. It is more than enough if you are writing SQS producer, as it is just one API call to produce a message. For consumers, it might get a little more complex.

If Spring Boot is your framework of choice and you plan to implement the consumer, you are in luck, Spring Cloud AWS project implements a wrapper on top of the AWS Java SDK to make it natural for the Spring project by providing Bean-based configuration and set of annotations to easily create SQS consumers.

Consuming messages

Spring way is to separate configuration and logic. First, we will take a look at some of the beans we can define to configure our SQS consumer. Then, we will write a listener method, that will implement the processing logic.

Configuration

Let’s start by adding a dependency. We use Gradle for managing dependencies in our project, but it will work as well with Maven. Make sure to update to the last version and check if there are some changes in the API we use in this article.

implementation 'org.springframework.cloud:spring-cloud-aws-autoconfigure:2.2.4.RELEASE'
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.4.RELEASE'

If you want, you can override the auto-configured AmazonSQSAsync bean to specify AWS endpoint, AWS credentials, and some other settings. This can be useful if you want to use SQS mocks or specific credentials, not the ones provided by the environment.

@Bean
@Primary
public AmazonSQSAsync amazonSqsAsync() {
  final var clientBuilder = AmazonSQSAsyncClientBuilder.standard();
  AmazonSQSAsyncClientBuilder.EndpointConfiguration endpointConfiguration =
      new AwsClientBuilder.EndpointConfiguration("ENDPOINT","REGION");
  clientBuilder.setEndpointConfiguration(endpointConfiguration);
  final AWSCredentials credentials = new BasicAWSCredentials("AWS_ACCESS_KEY", "AWS_SECRET");
  clientBuilder.setCredentials(new AWSStaticCredentialsProvider(credentials));
  return clientBuilder.build();
}

Then, we can configure SimpleMessageListenerContainerFactory to set such settings as auto-startup, the maximum number of messages to be processed in parallel, default visibility timeout, and many others. Mostly it is the same settings you set when you create an SQS queue, overridden on the consumer level. Refer to Amazon SQS documentation to learn about each one of them in detail.

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
    AmazonSQSAsync amazonSqsAsync) {
  final var listenerContainerFactory = new SimpleMessageListenerContainerFactory();
  listenerContainerFactory.setAutoStartup(true);
  listenerContainerFactory.setAmazonSqs(amazonSqsAsync);
  listenerContainerFactory.setMaxNumberOfMessages(1);
  return listenerContainerFactory;
}

In most cases, these two beans are enough to configure your SQS consumer.

SQS Listener

Now, that we have all the necessary beans configured, we can create a component with a method annotated as @SqsListener. When Spring will detect such annotation, it will create an SQS consumer that will call the method for each received message and will pass it and other metadata as arguments. This approach called aspect-oriented programming (AOP) and widely used in Spring Framework.

Let’s take a look at a simple listener method:

@SqsListener(value = "${sqs.queueName}",
      deletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE)
  public void receiveMessage(String message) { ... }

There are two arguments we can pass in the @SqsListener annotation. The value defines the name (not URL) of the SQS queue. You are also free to use Spring configuration syntax to inject the value from settings. The deletionPolicy defines the behavior for deletion of the message from the queue. We discuss it in more detail in the “Implementing retries” section below.

The SQS message itself is passed as an argument in the method marked with @SqsListener annotation. By default, the raw text of the message is passed, but you can set up a special QueueMessageHandlerFactory bean, to offload deserialization of the message payload to the framework. You can see an example of such setup for the S3 notification objects here: MappingJackson2MessageConverter.

One more functionality that is out of the scope of this article is sending the result of processing to another SQS queue. It can be implemented natively with just one more annotation and changing the method return type. Please refer to the official documentation for details.

Implementing retries

As discussed above in the visibility section, the message is not deleted by the SQS when it is received by the consumer, it is the responsibility of the consumer to delete it when the processing is finished. Spring Cloud framework provides several predefined behaviors set by the deletionPolicy argument:

  • ALWAYS - the message deleted when the execution of the method with the @SqsListener annotation is finished for any reason - thrown exception or normal completion.
  • NEVER - never deletes message automatically. This policy requires the injection of the special Acknowledgment argument, that allows manual control of the deletion process from the code.
  • NO_REDRIVE - always deletes the message on normal completion, but when an exception is thrown it is only deleted if there is no DLQ and redrive policy set up for the queue. If the DLQ and redrive policy are set up, the message is not deleted.
  • ON_SUCCESS - only delete the message on normal completion, never delete when the exception is thrown.

The redrive policy that is set on the queue level, defines how many times the message will be attempted to be processed before being moved to DLQ. In production, it is recommended to set up DLQ with a reasonable redrive policy for your use case. With such a setup using NO_REDRIVE or ON_SUCCESS would allow safe retries across distributed consumers from the SQS queue.

There is one problem with such architecture though. In case the processing takes a long time, for example, several hours, you would set the visibility timeout to a large value, probably the maximum - 12 hours. This means that when the processing method throws an exception, the message will be retried in a bit less than 12 hours, which is often unacceptable. Here another optional argument of type Visibility comes to the rescue. Take a look at this code:

@SqsListener(value = "${sqs.queueName}",
      deletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE)
public void receiveMessage(String message, Visibility visibility) {
  try {
    ...
  } catch (Exception e) {
    visibility.extend(60).get();
    throw e;
  }
}

Here, after an exception is caught, we set the visibility timeout to 60 seconds and throw the original exception. This way the exception will remain invisible only for 60 seconds and will be retried again. Please notice that even though the name of the method is extend(), actually it just sets the visibility timeout.

Conclusion

Spring Cloud AWS is a good tool for making reliable and highly customizable SQS consumers with very little code. Unfortunately, the documentation is not very extensive, I hope this article can help someone who just started using AWS services with from Spring environment.

株式会社ビービットでは一緒に働く仲間を募集しています
9 いいね!
9 いいね!
同じタグの記事
今週のランキング
株式会社ビービットからお誘い
この話題に共感したら、メンバーと話してみませんか?