NHN FORWARD 22 - 분산 시스템에서 데이터를 전달하는 효율적인 방법

본 글은 [NHN FORWARD 22] 분산 시스템에서 데이터를 전달하는 효율적인 방법을 정리한 글입니다.

다룰 내용

  • 데이터 전달 보장 방법론
  • RDB를 사용하는 애플리케이션에서 전달 방법
  • RabbitMQ를 사용한 전달 방법
  • Kafka를 사용하는 애플리케이션의 전달 방법

분산 시스템

  • 여러개의 컴퓨터 리소스를 사용하는 시스템
  • 시스템은 두 개 이상의 컴포넌트로 구성
    • 마이크로서비스
  • 네트워크를 통해 데이터를 전달
    • Remote API
      • 서버/클라이언트 방식
      • 요청시 즉각 응답하는 동기식 방식
      • 비교적 간단
    • Message Queue
      • Publisher/Consumer 구조
        • Consumer가 데이터 조작
      • 배치/비동기 작업
      • 비교적 복잡

분산 시스템에서 데이터를 전달하는 효율적인 방법

  • 분산 시스템은 네트워크로 연결
    • 유실가능
  • 데이터 전달 보장 방법
    • At most once (최대 한번)
      • Producer는 한번만 전송
      • Consumer는 한번만 수신
      • 간단하개 개발가능하나 Producer/Consumer 둘중 하나가 실패하면 메시지 유실 발생
    • At least once (최소 한번)
      • Producer는 최소 한번 이상 발송
      • Consumer는 최소 한번 이상 수신
      • Producer는 메시지발송 보장
      • Consumer는 메시지 처리시 멱등성(idempotent)을 보장해야함
    • Exactly once (정확히 한번)
      • 메시지는 정확하게 한번 전송
        • 누락과 중복이 없음
      • 가장 어려운 난이도
      • Producer, Consumer에서 모든 상태 관리
      • MessageQueue 기능에 의존한 개발
        • MessageQueue 추가로 인한 시스템 복잡도 증가
  • 우리는 개발시 최소 한번은 전달하고 있는가?

RDB를 사용하는 애플리케이션에서 전달 방법

  • 서비스별 독립된 데이터베이스를 가지고 있는 경우
    • @TransactionalEventlistener,
      • 네크워트 실패를 방어하지 못함
        • @Retryable을 사용하면?
          • 1번 재시도?
        • maxAttempts, backoff 설정
        • 그러나 네트워크는 계속 실패할 수 있음
    • TransactionSynchronizionManager, TransactionSynchronization
    • 다음과 같은 마이크로서비스 아키텍처 패턴 사용가능
      • Transactional Outbox Pattern
        • RDB를 Message Queue로 사용
        • OLTP에 Event Message를 포함하는 패턴
      • Polling Publisher Pattern
        • RDB Message Queue Polling & Publishing
    • 두개 혼합해서 사용하면 안전하게 메시지 전달 가능
      • 이벤트를 RDB에 넣음
        • 하나의 트랜잭션으로 서비스의 로직과 이벤트를 처리 가능
      • 데몬/스케줄러로 DB에 저장된 이벤트를 주기적으로 발행
    • 이벤트 저장시 중요한 값
      • event_id
        • pk로 사용하면 이벤트 순서 보장 가능
      • created_at
      • status
        • ready / done
      • payload

Transactional Outbox Pattern 예시

@Service
public class CreateTaskService implements CreateTaskUserCase {

  @Transactional
  public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
    Task task = createTaskCommand.toTask();

    taskRepository.save(task);
    eventRepository.save(CreateTaskEvent.of(task));

    return CreateTaskResponse.of(task);
  }
}

Polling Publisher Pattern 예시

@Service
public class MessagePublisher {

  @Scheduled(cron = "0/5 * * * * *")
  @Transactional
  public void publish() {
    LocalDateTime now = LocalDateTime.now();
    eventRepository.findByCreatedAtBefore(now, EventStatus.READY)
                    .stream()
                    .map(event -> restTemplate.execute(event))
                    .map(event -> event.done())
                    .forEach(eventRepository::save);
  }
}
  • 장점
    • REST-API 환경에서 At-least-once 가능
  • 단점
    • Polling, Publisher 과정으로 지연 처리
    • 데이터베이스 부하
      • 처리속도가 데이터베이스 속도에 의존됨

RabbitMQ를 사용한 전달 방법

  • AMQP(Advanced Message Queuing Protocol)을 구현한 메시지 브로커
  • Publish/Subscribe 방식 지원
  • ACK(Acknowledgement) 기반 메시지 전송/수신 방식 지원
    • Ack, Nack 방식
    • Producer Confirm
    • Consumer Ack
  • 메시지 전달방법
    • Producer가 메시지 발생
    • Exchange가 받아서 적절한 Queue에 입력 (Routing)
    • Queue
    • Consumer가 Queue 메시지 처리
  • Routing 실패하는경우
    • Producer Confirm으로 전달
      • org.springframework.amqp.rabbit.connection.CorrelationData.java
        • Producer Confirm을 확인할 수 있는 기본 클래스
        • RabbitTemplate와 사용

Producer Confirm

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
  RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
  //...
  rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
      Message message = correlationData.getReturned().getMessage();
      byte[] body = message.getBody();
      log.error ("Fail to produce. ID: {}", correlationData.getId());
    }
  });
  return rabbitTemplate;
}
spring.rabbitmq.publisher-confirm-type=correlated

Consumer Ack

@Component
public class Messagelistener {
  @RabbitListener(queues = "dooray.tast")
  public void receiveMessage(Message message, Channel channel) {
    
    // 수동 ACK 전송
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    // 수동 NACK 전송
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  }
}
  • ChannelAware가 Prefix인 Listener 클래스를 사용하면 Ack 구현가능
    • ChannelAwareBatchMessageListener.java
    • ChannelAwareMessageListener.java
  • 컨슈머에 버그가 있어서 계속 NACK만 발생시키면?
    • 이런경우 Dead Letter를 설정
      • Dead Latter Exchange -> Dead Letter Queue
      • RabbitMQ DLQ 조건
        • retry false, basic reject basic nack
        • queue의 메시지가 ttl이 넘은경우
        • queue가 가득차서 더이상 처리 불가능한경우
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory (ConnectionFactory connFactory) {
  //...
  var containerFactory = new SimpleRabbitListenerContainerFactory();
  containerFactory.setConnectionFactory(connFactory);
  containerFactory.setAdviceChain(
    RetryInterceptorBuilder.stateless()
                           .maxAttempts (3)
                           .backOffOptions (1000, 2, 2000)
                           .recoverer(new RejectAndDontRequeueRecoverer()) .build());
  return containerFactory;
}

Kafka를 사용한 전달 방법

  • Producer Confirm와 Consumer Ack 구현가능

Producer Confirm

@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {
  public void sendEvent(CreateTaskEvent event) {
    ListenableFuture<SendResult<String, CreateTaskEvent>> future = 
      kafkaTemplate.send (TOPIC_TASK, event);
    future.addCallback(
      result -> log.info("offset : {}", result.getRecordMetadata.offset),
      throwable > log.error("fail to publish", throwable)
    );
  }
}

Consumer Ack

@FunctionalInterface
public interface AcknowledgingMessageListener<K, V extends MessageListener<K, V {
  default void onMessage(ConsumerRecord<K, V> data) {
    throw new UnsupportedOperationException ("Container should never call this");
  }
  void onMessage(ConsumerRecord<K, V> var1, Acknowledgment var2);
}

// consumer class
public class Consumer {
  @Override
  @KafkaListener(
    // ...
    containerFactory = "kafkaListenerContainerFactory"
  )
  public void onMessage (ConsumerRecord<String, String> consumerRecord, Acknowledgement acknowledgement) {
    try {
      // Do something...
      acknowledgement.acknowledge();
    } catch (Exception e) {
      log.error("Error to receive messages.", e);
    }
  }
}

// configuration class
public class Configuration {
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIS, bootstrapServer);
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      // 메뉴얼로 acknowledgement를 전송할때는 ENABLE_AUTO_COMMIT_CONFIG false 처리
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  }
}

마무리

  • Event driven Achitecture의 기본은 데이터 전달
  • At Least Once 설정
  • Producer Confirm, Consumer Ack 고려