0. 서론
특정한 이벤트가 있을 경우 지정한 시간동안 의도적으로 딜레이 후에 이벤트를 처리, 이벤트를 수신한대로 적용을 해야하는 케이스가 생겼습니다. 요구사항을 어떻게 처리할까 고민을 하다가, 이번에 SQS를 도입해서 적용을 해보면 좋겠다 라고 생각했습니다. 생각한 이유는 다음과 같습니다.
- FIFO(선입선출) 큐를 지원한다
- 지연 큐를 지원한다
- 인프라, 운영적인 측면 고려
위의 내용으로 인해 AWS SQS를 적용하게 되었습니다. 본 포스팅을 통해 SQS에 대해 간단히 알아보고, SQS를 코드에 적용해서 메시지를 발신 및 수신을 해보도록 하겠습니다.
1. SQS (Simple Queue Service) 란?
SQS(Simple Queue Service)는 AWS에서 제공하는 애플리케이션 간 메시지를 주고받을 수 있는 서비스입니다. 서비스하는 애플리케이션들을 보다 효율적으로 아키텍쳐들을 연결하는 요소들로 작동하기 위한 소프트웨어를 미들웨어 라고 하는데 SQS에서는 메시지 브로커에 있는 큐에 데이터를 보내고 받는 프로듀서와 컨슈머를 통해 메시지를 통신하는 구조로 되어 있습니다. SQS를 얻는 이점은 어플리케이션에서 데이터를 큐에 넣고 큐에 넣은 데이터 가져가는 어플리케이션을 구분해서 어플리케이션간 연결을 느슨하게 연결해주는 역할을 합니다.
2. SQS 구성
SQS 구성을 간단하게 설명해보겠습니다.
큐 (Queue)
- 메세지를 담는 공간입니다.
- 컴퓨터 자료형의 큐와 이름이 같지만, 선입선출(FIFO)를 보장하지는 않습니다
- 이는 설정에 따라 달라집니다!
메세지(Message)
- SQS 의 기본 데이터 단위입니다.
- 메세지는 XML, JSON과 같은 텍스트 형태입니다.
- 메세지마다 고유한 ID가 부여됩니다.
처리 실패 큐 (Dead Letter Queues)
- 보통 메시지를 보내고 Ack를 보내면 메시지를 삭제하는데, 메시지를 소비하는 컨슈머가 잘 처리하지 못했다는 NAck를 보낸다면 유저가 설정한 재시도 횟수만큼 재시도를 하게 됩니다.
- 재시도 횟수에도 계속 메시지를 처리하지 못했다면 해당 메시지를 기존 큐에서 처리 실패 큐로 전송을 하게 됩니다.
생산자(Producer)
- 데이터를 보내는 애플리케이션
소비자(consumer)
- 데이터를 받는 애플리케이션
지연 전송 (Delay Delivery)
- 특정 시간 동안 메세지를 지연 시키는 기능
짧은 폴링 (Short Polling)
- 메시지 받기 요청을 하면 결과를 바로 받습니다. 메시지가 있으면 메시지를 가져오고 없으면 그냥 빠져 나옵니다
긴 폴링 (Long Polling)
- 메시지가 있으면 바로 가져오고, 메시지가 없으면 메시지가 올 때까지 기다립니다. 메시지가 계속 오지 않으면 긴 폴링 제한 시간까지 기다립니다.
3. SQS 기본 동작 방식
SQS 기본 동작 방식
- 메시지 프로듀서는 SQS 큐에 전달할 메시지를 보냅니다.
- 큐에서는 메시지를 설정에 따라 일정 시간 가지고 있습니다. (소비 상태는 아닙니다)
- 메시지 소비자는 주기적으로 큐에 가져갈 메시지가 있는지 Polling하여, 소비해야할 메시지가 있다면 Pull 하여 가져갑니다.
- 메시지 컨슈머는 Ack를 보내 메시지 처리가 완료됨을 알릴 수 있습니다.
- SQS는 Ack를 수신하면 옵션에 따라 해당 메시지를 보유할지 삭제할 수 있습니다.
- 만약 NAck를 수신하면 DLQ로 메시지를 보냅니다.
4. AWS SQS 세팅
SQS를 사용하려면 AWS에서 SQS를 세팅해야합니다.
다음은 세팅하는 방법을 간단하게 알아보겠습니다.
- 유형
- 표준: 큐이지만 메시지 순서가 유지되지 않는 큐입니다
- FIFO: 자료구조에서 다루는 큐입니다. 선입선출 구조이고 무조건 순서를 보장합니다
- 이름
- 큐를 수신할 때 사용될 이름입니다
- 리드라이브 허용 정책
- 배달 못한 편지 대기열
- 앞서 설명한 DLQ입니다. 메시지를 소비할 수 없는 경우 사용되고, 왜 소비를 하지못했는지 원인은 알 수 있습니다.
- 컨슈머가 큐의 메시지를 Pulling하였지만 소비를 하지 못하는 경우 시도한 횟수가 누적되는데 설정해둔 최대치를 초과하는 경우 해당 메시지는 DLQ로 이동합니다
- 기존에 설정해둔 큐 보존기간과 최소는 하루 이상으로 길게 보존해둡니다.
5. Spring cloud 2.2.X 적용
Spring boot에 따라 알아두어야할 점이 있습니다. Spring boot 버젼에 따라 2.3 이상 이라면 sdk 버젼을 적용할 수 있습니다. 이번에 다뤄볼 버젼은 2.2.X에 적용 가능한 버젼을 적용하였습니다.
5-1. dependency 추가
dependencies {
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.1.RELEASE'
implementation group: 'org.springframework.cloud', name: 'spring-cloud-starter-aws', version: '2.2.1.RELEASE'
}
SQS를 사용하기 위해 위의 dependency를 추가해줍니다.
5-2. application-local.yml 추가
cloud:
aws:
region:
static: ap-northeast-2
stack:
auto: false
credentials:
access-key: aaaaaaaaaaa
secret-key: bbbbbbbbbbb
sqs:
queue:
name: {생성한 큐 이름}.fifo # study-sqs.fifo
url: https://sqs.ap-northeast-2.amazonaws.com/{큐 번호}/{생성한 큐 이름}.fifo # 생성한 SQS 큐 url
- region: AWS 리전 정보
- sqs에는 생성한 큐와 url을 등록해줍니다.
- 저는 AWS에 등록한 study-sqs.fifo 로 등록해줬습니다
- 일반 큐를 만들때는 상관없지만, FIFO 큐를 생성할 때는 {생성한 큐 이름} .fifo 를 같이 붙여줘야합니다.
@Configuration
public class AwsSqsConfiguration {
@Value("${cloud.aws.credentials.access-key}")
private String accessKey;
@Value("${cloud.aws.credentials.secret-key}")
private String secretKey;
@Value("${cloud.aws.region.static}")
private String region;
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
}
@Bean
public AmazonSQSAsync amazonSQSClient() {
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(region)
.withCredentials(awsCredentialsProvider())
.build();
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainerFactory factory =
new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setMaxNumberOfMessages(10);
factory.setWaitTimeOut(20);
factory.setTaskExecutor(messageThreadPoolTaskExecutor());
return factory;
}
@Bean
public ThreadPoolTaskExecutor messageThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(20);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
- application.yml에 있는 access-key, secret-key 값을 가지고 AWSCredentialsProvider 빈을 생성해줍니다.
- Message 송신에 사용되는 AmazonSQS 빈 생성 시 해당 credentials 정보와 region 정보 설정해줍니다.
public interface AwsSqsProperties {
String getName();
String getUrl();
}
SQS 관련 프로퍼티를 만들어주기 위해 인터페이스를 선언했습니다.
@Getter
@Component(value = "StudySqsProperties")
@ConfigurationProperties(prefix = "cloud.aws.sqs.queue.study-sqs")
public class StudySqsProperties implements AwsSqsProperties {
private String url;
private String name;
}
SQS 관련 프로퍼티 구현체입니다. url과 name을 넣어줬습니다.
이제 세팅을 위한 코드는 끝났습니다. 이제는 위에서 언급한 SQS를 이용해서 메시지를 큐에 넣어서 보내는 것(생산), 큐에 있는 메시지를 큐에서 빼내는 것(소비) 동작을 알아보도록 하겠습니다. 먼저 메시지 생성부터 알아보겠습니다.
5-3. SQS로 메시지 전송
SQS로 보낼 메시지를 만들어서 SQS 큐로 전송(Publish)하는 코드입니다. 바로 간단한 코드를 만들어서 적용해보겠습니다.
Publish(큐로 메시지 전송)
@Controller
@RequiredArgsConstructor
@RequestMapping("/sqs")
public class StudySqsSendController {
private final StudySqsSendService studySqsSendService;
@PostMapping("/message")
public ResponseEntity<?> sendTestMessage(@RequestBody TestMessage testMessage){
StudySqsSendService.sendTestMessageToSqS(testMessage);
return ResponseEntity.ok().build();
}
}
간단하게 컨트롤러를 통해 SQS로 전송할 수 있도록 테스트 컨트롤러를 만들어줬습니다.
import com.amazonaws.services.sqs.model.Message;
@Slf4j
@Service
@RequiredArgsConstructor
public class StudySqsSendService {
private final StudySqsSender studySQSSender;
public void sendTestMessageToSqS(TestMessage testMessage){
Message message = setMessage(testMessage);
studySQSSender.sendMessage(message);
}
private Message setMessage(TestMessage testMessage) {
String payloadJson = objectToString(testMessage);
Message message = new Message();
message.setBody(payloadJson);
return message;
}
// 편의상 같은 클래스에 작성
private String objectToString(Object obj) {
ObjectMapper mapper = new ObjectMapper();
String responseString = null;
try {
responseString = mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("SQS JSON 역직렬화 도중 에러: ", e);
}
return responseString;
}
}
sqsSendService 에서는 메시지를 세팅과 함께 SQS 큐로 전송하는 서비스입니다.
- sqs에서 제공하는 Message payload로 세팅을 해줍니다
- Message에 헤더를 적용하려면 아래와 같이 적용해주면 됩니다.
- message.addMessageAttributesEntry("header1", new MessageAttributeValue().withDataType("String").withStringValue("value1"));
- sqs에 적용할 내용을 body에 세팅해주면 됩니다. 저는 간단하게 body에만 메시지를 만들어줬습니다.
- 글 편의상 ObjectMapper 코드를 클래스 분리 없이 넣어줬습니다.
public interface StudySqsSender{
SendMessageResult sendMessage(Message message);
}
간단하게 큐로 전송할 메서드를 포함한 인터페이스를 만들어줍니다.
@Component
public class StudySqsSenderImpl implements StudySqsSender{
private final ObjectMapper objectMapper;
private final AmazonSQS amazonSQS;
private final AwsSqsProperties properties;
public StudySqsSenderImpl(ObjectMapper objectMapper, AmazonSQS amazonSQS,
@Qualifier("StudySqsProperties") AwsSqsProperties awsSqsProperties) {
this.objectMapper = objectMapper;
this.amazonSQS = amazonSQS;
this.properties = awsSqsProperties;
}
@Override
public SendMessageResult sendMessage(Message message){
String sqsObjectDeserialize = sqsObjectDeserialize(message);
SendMessageRequest sendMessageRequest =
new SendMessageRequest(properties.getUrl(), sqsObjectDeserialize)
.withMessageGroupId("study-sqs")
.withMessageDeduplicationId(UUID.randomUUID().toString());
return amazonSQS.sendMessage(sendMessageRequest);
}
public String sqsObjectDeserialize(Object o){
String resultString = null;
try {
resultString = objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e){
throw new ObjectMapperException("SQS JSON 역직렬화 도중 에러: ", e);
}
return resultString;
}
}
실제 메시지를 보내는 구현체입니다. setting과 url을 세팅해주고 실제 aws sqs로 큐를 publish하는 곳입니다.
- FIFO 형식의 큐일경우 DeduplicationId, MessageGroupId를 반드시 세팅해줘야합니다.
- 그렇지 않으면 에러가 발생합니다
- 저는 FIFO 큐로 설정해줬기 때문에 2개를 세팅해줬습니다.
consume(큐에서 메시지 소비)
큐에 쌓여있는 메시지를 소비하는 코드입니다. 바로 간단한 코드를 만들어서 적용해보겠습니다.
public interface AwsSqsProperties {
String getName();
String getUrl();
}
큐의 이름과 URL을 property로 가지도록 인터페이스를 구성해줬습니다.
@Getter
@Component(value = "StudySqsProperties")
@ConfigurationProperties(prefix = "cloud.aws.sqs.queue.study-sqs")
public class StudySqsProperties implements AwsSqsProperties {
private String url;
private String name;
}
인터페이스를 구현해서 실제 수신할 큐의 이름과 URL property를 만들어줍니다.
- 큐를 listen 하기 위해 .yml 파일에서 정의한 프로퍼티를 만들어줍니다.
@Slf4j
@RequiredArgsConstructor
@Component
public class StudySqsWebhookListener {
@SqsListener(value = "${cloud.aws.sqs.queue.name}",
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@Headers Map<String, String> header, @Payload String message){
Message receiveWebhookMessage = objectSerialize(message, Message.class);
String messageBody = receiveWebhookMessage.getBody();
TestMessage testMessage = objectSerialize(messageBody, TestMessage.class);
System.out.println(testMessage.toString());
}
public static <T> T objectSerialize(String jsonString, Class<T> valueType){
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonString, valueType);
} catch (JsonProcessingException e){
throw new ObjectMapperException("JSON 직렬화 에러", e);
}
}
}
메시지를 소비하는 리스너입니다.
- 어노테이션 기반으로 작동하며 예외를 던져주지 않으면 성공으로 간주하고, ack를 명시적으로 보내지 않아도 되고, 실패할 경우 NAck를 보내주면 됩니다.
- 메시지를 수신할 경우, 큐에서 메시지를 지우도록 deletionPolicy 를 설정해줬습니다.
- deletionPolicy 는 4가지 옵션이 있습니다.
- ALWAYS: Message가 들어오면 항상 삭제요청을 보냅니다
- NEVER: 절대 삭제 요청을 보내지 않습니다
- NO_REDRIVE: DLQ가 정의되지 않았다면 메시지 삭제요청을 보냅니다
- ON_SUCCESS: NAck를 수신하지않고 메시지를 소비하는데 성공을 했다면 메시지 삭제요청을 보냅니다
7. 정리
SQS를 AWS 세팅부터 Spring boot(2.2.X) 버젼에서 적용하는 방법을 정리해봤습니다.
다른 Message Queue들이 존재하는데 요구사항에서 언급되었던 'FIFO(선입선출) 큐를 지원한다, 지연 큐를 지원한다' 를 적용해야한다면 Amazon SQS 적용을 검토해보면 좋을 것 같습니다.
긴 글 읽어주셔서 감사합니다.
(부록)
부록 1. 적용하며 마주한 에러
- SNS cannot send messages to SQS queues due to permission issue
- 원인: sqs에서는 퍼블릭키, 시크릿키를 통해 유저를 구분합니다. 그리고 sqs 관련 설정에서 빈으로 주입을 시킵니다. 이 때 설정해놓은 퍼블릭키, 시크릿키가 환경별로 같으면 다음과 같은 에러가 발생합니다.
@Bean @Primary public AWSCredentialsProvider awsCredentialsProvider() { return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); }
- 해결: aws IAM에서 키를 새로 발급 후 처리
부록 2. FIFO 큐에서 처리 실패한다면?
회사에서 업무하다가 처리 순서가 굉장히 중요해서 SQS FIFO 큐에서 처리 실패했을 때 처리를 고려했을 때 알아본 내용입니다. SQS
FIFO 큐에서 메시지의 소비 및 처리가 실패한 경우의 흐름도는 다음과 같습니다.
- 메시지 소비 및 처리 시도 (첫 번째 시도):
- 소비자(Consumer)가 SQS FIFO 큐에서 메시지를 가져와 처리를 시도합니다.
- 처리가 성공적으로 완료되면 해당 메시지는 큐에서 제거됩니다.
- 메시지 소비 및 처리 실패 (첫 번째 시도):
- 소비자가 메시지 처리에 실패하거나 예외가 발생하면 해당 메시지는 큐로 다시 반환됩니다.
- 메시지는 큐의 맨 앞으로 이동하게 되어 다시 처리를 시도해야 합니다.
- 재시도 (AWS에서 세팅한 재시도 횟수만큼 N번 시도):
- 메시지가 다시 큐의 맨 앞으로 이동하면 소비자는 재시도를 시도합니다.
- SQS FIFO 큐는 재시도 횟수를 추적할 수 있으며, 설정에 따라 일정 횟수 이상 재시도한 메시지를 처리하지 못하면 Dead Letter Queue(DLQ)로 이동시킬 수 있습니다.
- 메시지 처리 성공:
- 재시도를 통해 메시지가 성공적으로 처리되면 해당 메시지는 큐에서 제거됩니다.
- Dead Letter Queue로 이동 (선택 사항):
- 설정에 따라 일정 횟수 이상 재시도한 메시지는 Dead Letter Queue(DLQ)로 이동할 수 있습니다.
'Back-end > Spring' 카테고리의 다른 글
[Spring] Spring 캐시 사용하기 (0) | 2024.02.18 |
---|---|
[Spring] AWS SES 이메일 반송률 관리하기 (0) | 2024.02.04 |
댓글