NHN FORWARD 22 - 분산 시스템에서 데이터를 전달하는 효율적인 방법
본 글은 [NHN FORWARD 22] 분산 시스템에서 데이터를 전달하는 효율적인 방법을 정리한 글입니다.
다룰 내용
- 데이터 전달 보장 방법론
- RDB를 사용하는 애플리케이션에서 전달 방법
- RabbitMQ를 사용한 전달 방법
- Kafka를 사용하는 애플리케이션의 전달 방법
분산 시스템
- 여러개의 컴퓨터 리소스를 사용하는 시스템
- 시스템은 두 개 이상의 컴포넌트로 구성
- 마이크로서비스
- 네트워크를 통해 데이터를 전달
- Remote API
- 서버/클라이언트 방식
- 요청시 즉각 응답하는 동기식 방식
- 비교적 간단
- Message Queue
- Publisher/Consumer 구조
- Consumer가 데이터 조작
- 배치/비동기 작업
- 비교적 복잡
- Publisher/Consumer 구조
- Remote API
분산 시스템에서 데이터를 전달하는 효율적인 방법
- 분산 시스템은 네트워크로 연결
- 유실가능
- 데이터 전달 보장 방법
- At most once (최대 한번)
- Producer는 한번만 전송
- Consumer는 한번만 수신
- 간단하개 개발가능하나 Producer/Consumer 둘중 하나가 실패하면 메시지 유실 발생
- At least once (최소 한번)
- Producer는 최소 한번 이상 발송
- Consumer는 최소 한번 이상 수신
- Producer는 메시지발송 보장
- Consumer는 메시지 처리시 멱등성(idempotent)을 보장해야함
- Exactly once (정확히 한번)
- 메시지는 정확하게 한번 전송
- 누락과 중복이 없음
- 가장 어려운 난이도
- Producer, Consumer에서 모든 상태 관리
- MessageQueue 기능에 의존한 개발
- MessageQueue 추가로 인한 시스템 복잡도 증가
- 메시지는 정확하게 한번 전송
- At most once (최대 한번)
- 우리는 개발시 최소 한번은 전달하고 있는가?
RDB를 사용하는 애플리케이션에서 전달 방법
- 서비스별 독립된 데이터베이스를 가지고 있는 경우
@TransactionalEventlistener
,- 네크워트 실패를 방어하지 못함
@Retryable
을 사용하면?- 1번 재시도?
maxAttempts
,backoff
설정- 그러나 네트워크는 계속 실패할 수 있음
- 네크워트 실패를 방어하지 못함
TransactionSynchronizionManager
,TransactionSynchronization
- 다음과 같은 마이크로서비스 아키텍처 패턴 사용가능
- Transactional Outbox Pattern
- RDB를 Message Queue로 사용
- OLTP에 Event Message를 포함하는 패턴
- Polling Publisher Pattern
- RDB Message Queue Polling & Publishing
- Transactional Outbox Pattern
- 두개 혼합해서 사용하면 안전하게 메시지 전달 가능
- 이벤트를 RDB에 넣음
- 하나의 트랜잭션으로 서비스의 로직과 이벤트를 처리 가능
- 데몬/스케줄러로 DB에 저장된 이벤트를 주기적으로 발행
- 이벤트를 RDB에 넣음
- 이벤트 저장시 중요한 값
- event_id
- pk로 사용하면 이벤트 순서 보장 가능
- created_at
- status
- ready / done
- payload
- event_id
Transactional Outbox Pattern 예시
@Service
public class CreateTaskService implements CreateTaskUserCase {
@Transactional
public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
Task task = createTaskCommand.toTask();
taskRepository.save(task);
eventRepository.save(CreateTaskEvent.of(task));
return CreateTaskResponse.of(task);
}
}
Polling Publisher Pattern 예시
@Service
public class MessagePublisher {
@Scheduled(cron = "0/5 * * * * *")
@Transactional
public void publish() {
LocalDateTime now = LocalDateTime.now();
eventRepository.findByCreatedAtBefore(now, EventStatus.READY)
.stream()
.map(event -> restTemplate.execute(event))
.map(event -> event.done())
.forEach(eventRepository::save);
}
}
- 장점
- REST-API 환경에서 At-least-once 가능
- 단점
- Polling, Publisher 과정으로 지연 처리
- 데이터베이스 부하
- 처리속도가 데이터베이스 속도에 의존됨
RabbitMQ를 사용한 전달 방법
- AMQP(Advanced Message Queuing Protocol)을 구현한 메시지 브로커
- Publish/Subscribe 방식 지원
- ACK(Acknowledgement) 기반 메시지 전송/수신 방식 지원
- Ack, Nack 방식
- Producer Confirm
- Consumer Ack
- 메시지 전달방법
- Producer가 메시지 발생
- Exchange가 받아서 적절한 Queue에 입력 (Routing)
- Queue
- Consumer가 Queue 메시지 처리
- Routing 실패하는경우
- Producer Confirm으로 전달
- org.springframework.amqp.rabbit.connection.CorrelationData.java
- Producer Confirm을 확인할 수 있는 기본 클래스
- RabbitTemplate와 사용
- org.springframework.amqp.rabbit.connection.CorrelationData.java
- Producer Confirm으로 전달
Producer Confirm
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
//...
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
Message message = correlationData.getReturned().getMessage();
byte[] body = message.getBody();
log.error ("Fail to produce. ID: {}", correlationData.getId());
}
});
return rabbitTemplate;
}
spring.rabbitmq.publisher-confirm-type=correlated
Consumer Ack
@Component
public class Messagelistener {
@RabbitListener(queues = "dooray.tast")
public void receiveMessage(Message message, Channel channel) {
// 수동 ACK 전송
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 수동 NACK 전송
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
- ChannelAware가 Prefix인 Listener 클래스를 사용하면 Ack 구현가능
- ChannelAwareBatchMessageListener.java
- ChannelAwareMessageListener.java
- 컨슈머에 버그가 있어서 계속 NACK만 발생시키면?
- 이런경우 Dead Letter를 설정
- Dead Latter Exchange -> Dead Letter Queue
- RabbitMQ DLQ 조건
- retry false, basic reject basic nack
- queue의 메시지가 ttl이 넘은경우
- queue가 가득차서 더이상 처리 불가능한경우
- 이런경우 Dead Letter를 설정
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory (ConnectionFactory connFactory) {
//...
var containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connFactory);
containerFactory.setAdviceChain(
RetryInterceptorBuilder.stateless()
.maxAttempts (3)
.backOffOptions (1000, 2, 2000)
.recoverer(new RejectAndDontRequeueRecoverer()) .build());
return containerFactory;
}
Kafka를 사용한 전달 방법
- Producer Confirm와 Consumer Ack 구현가능
Producer Confirm
@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {
public void sendEvent(CreateTaskEvent event) {
ListenableFuture<SendResult<String, CreateTaskEvent>> future =
kafkaTemplate.send (TOPIC_TASK, event);
future.addCallback(
result -> log.info("offset : {}", result.getRecordMetadata.offset),
throwable > log.error("fail to publish", throwable)
);
}
}
Consumer Ack
@FunctionalInterface
public interface AcknowledgingMessageListener<K, V› extends MessageListener<K, V› {
default void onMessage(ConsumerRecord<K, V> data) {
throw new UnsupportedOperationException ("Container should never call this");
}
void onMessage(ConsumerRecord<K, V> var1, Acknowledgment var2);
}
// consumer class
public class Consumer {
@Override
@KafkaListener(
// ...
containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage (ConsumerRecord<String, String> consumerRecord, Acknowledgement acknowledgement) {
try {
// Do something...
acknowledgement.acknowledge();
} catch (Exception e) {
log.error("Error to receive messages.", e);
}
}
}
// configuration class
public class Configuration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIS, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메뉴얼로 acknowledgement를 전송할때는 ENABLE_AUTO_COMMIT_CONFIG false 처리
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
마무리
- Event driven Achitecture의 기본은 데이터 전달
- At Least Once 설정
- Producer Confirm, Consumer Ack 고려