본문 바로가기
아키텍처

Transactional Outbox Pattern과 적용 회고 (from. BaaS)

by Ramos 2025. 8. 1.

최근 BaaS 프로젝트에서 사용자의 신청서를 토대로 어드민이 외부 백업 장비에 직접 해당 요청에 대한 작업 후 사용자의 신청서의 상태를 일일이 바꿔야 하는 매니지드 기반 서비스 형태에서 모든 프로세스를 자동화시키는 셀프 서비스 형태로 리뉴얼 했다.

 

내가 알기론, 백업/복구 솔루션은 대부분 사용자의 서버에 백업 장비와 통신하기 위한 에이전트를 설치하도록 유도하고 사용자가 백업 에이전트를 직접 설치된 서버와 백업 장비간 통신이 원활하게 맺어지는지 검증이 되어야 풀 백업/증분 백업 및 복구가 가능한 것으로 알고 있다. 이는 우리 서비스도 마찬가지다.

 

여기서 셀프 서비스의 가장 핵심은 에이전트 설치와 검증 처리에 대한 자동화가 관건이었는데, 사용자가 직접 에이전트 설치 후 체킹 요청 시 백엔드에서 동기적으로 외부 API들을 연계해서 이를 처리하기엔 네트워크 상 timeout이 발생할 여지가 컸고, 더 나아가 사용자의 UX 측면에서 좋지 않았다.

MSA 구조의 백엔드는 아니었으나, 셀프서비스의 경우 Event Driven Architecture로 전환하는게 올바른 선택이란 판단에 Kafka를 도입했고, 개발 환경의 제약 상 코드 레벨에서 직접 Step Processing, Transactional Outbox Pattern을 적용했다.

 

정석적인 Transactional Outbox Pattern이 아닌 취사선택해서 적용했으나, 우리가 처리했던 이 패턴에 대해 사실과 오해를 정리하고자 이번 글을 예제 시나리오와 실제 프로덕트 코드의 일부를 기반으로 작성해본다.

전체 아키텍처 및 흐름도

우리 프로덕트와는 별개로 예제 시나리오에 대한 전체적인 아키텍처와 흐름도는 아래와 같다.

[사용자 요청]
     ↓
[Spring Service]
     └─ 비즈니스 처리 (ex: 주문 생성)
         └─ Outbox 테이블에 이벤트 기록 (같은 트랜잭션)
             ↓
        [DB Commit]
             ↓
[Debezium Connector]
    └─ DB binlog 감지 (Outbox 테이블 INSERT)
         ↓
[Kafka Connect]
    └─ Kafka로 이벤트 발행 (outbox-topic 등)
         ↓
[Kafka Consumer 서비스]
    └─ 메시지 처리 (이벤트 기반 후처리, 알림 전송, 포인트 적립 등)

 

예시

일반적인 outbox_event 테이블과 처리 예시는 아래와 같다.

CREATE TABLE outbox_event (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    aggregate_type VARCHAR(50), -- 예: ORDER
    aggregate_id VARCHAR(50),   -- 예: 주문 ID
    type VARCHAR(50),           -- 예: OrderCreated
    payload JSON,               -- 이벤트 데이터
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
@Transactional
public void createOrder(OrderDto dto) {
    Order order = orderRepository.save(dto.toEntity());

    OutboxEvent event = OutboxEvent.builder()
        .aggregateType("ORDER")
        .aggregateId(order.getId().toString())
        .type("OrderCreated")
        .payload(toJson(order)) // JSON 직렬화
        .build();
    outboxEventRepository.save(event);
}

 

 

OrderOutboxEvent는 같은 트랜잭션에서 처리한다.

Debezium Connector 설정 (예: MySQL → Kafka)

{
  "name": "mysql-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "85744",
    "database.server.name": "app-db",
    "database.include.list": "app_db",
    "table.include.list": "app_db.outbox_event",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "none",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Kafka 메시지 예시

{
  "aggregateType": "ORDER",
  "aggregateId": "12345",
  "type": "OrderCreated",
  "payload": {
    "orderId": "12345",
    "userId": "abcde",
    "total": 39000
  },
  "created_at": "2025-06-22T10:00:00Z"
}

Consumer

아래 예시처럼, 컨슈머 레벨에선 이벤트 타입 체크 후 간단한 방식으로 처리가 가능하다.

@KafkaListener(topics = "app-db.app_db.outbox_event", groupId = "order-consumer")
public void handleOutboxEvent(String message) {
    OutboxEventPayload event = objectMapper.readValue(message, OutboxEventPayload.class);

    if ("OrderCreated".equals(event.getType())) {
        orderEventProcessor.process(event.getPayload());
    }
}

구조의 장점

항목 설명
데이터 일관성 DB commit이 되어야만 Kafka publish 발생 (binlog 기반)
부하 분산 producer 직접 호출 없음 → non-blocking
확장성 이벤트 기반 아키텍처로 다양한 후처리 서비스 구성 가능
신뢰성 Kafka delivery semantics + outbox 로그로 보상 설계 용이

코드 기반 제어와 정석적인 구조?

사용자 정의 방식 (code-level Outbox)

  1. 트랜잭션 내 비즈니스 로직 수행 + Outbox 테이블 Insert
  2. 트랜잭션 commit 후 producer.send()
  3. Consumer는 메시지 수신 후 Outbox 테이블 조회 → 조건 판단
항목 설명
✅ 장점 트랜잭션 내부에서 DB와 Outbox를 함께 처리 가능
❌ 문제 Producer send()는 트랜잭션 외부 → Kafka와 DB의 일관성 보장 안 됨
❌ 문제 Consumer가 별도로 DB에 다시 질의해야 함 (성능 저하 및 중복 로직)
❌ 패턴 아님 이건 트랜잭셔널 아웃박스 패턴이 아니라, Outbox 테이블 + 추가 로직 처리 패턴

 

이 방식은 트랜잭셔널 아웃박스의 핵심인 "DB → Kafka로의 자동 전달"과 원자적 일관성 확보를 보장하지 않는다.

 

가령, 우리 서비스 내부의 실제 코드 일부는 아래와 같다.

/**
 * 어드민 에이전트 신청 관련 이벤트 컨슈머.
 * @author HakHyeon Song
 */
@Slf4j
@Component
@ConditionalOnProperty(name = "app.mode", havingValue = "admin")
@RequiredArgsConstructor
public class AgentSubmissionAdminEventConsumer {

    private final AgentSubmissionAutoService agentSubmissionAutoService;
    private final AgentSubmissionAdminEventProducer producer;

    private final AgentInstallationFacade agentInstallationFacade;

    private final ObjectMapper objectMapper;

    /**
     * 에이전트 설치 검증 이벤트 리스너.
     * 검증 실패 시, 재시도하기 위해 이벤트 재발행.
     */
    @KafkaListener(
       topics = "${spring.kafka.topic.agent-install-validation.name}",
       containerFactory = "pollingKafkaListenerContainerFactory",
       groupId = "${spring.kafka.topic.agent-install-validation.group-id}"
    )
    public void listenAgentInstallationValidation(ConsumerRecord<String, AgentSubmissionValidationEvent> event) {
       AgentSubmissionValidationEvent agentSubmissionValidationEvent = event.value();
       try {
          agentSubmissionAutoService.validateAgentSubmission(agentSubmissionValidationEvent.getAgentSubmissionId());
       }  catch (NoSuchElementException e) { // 검증 대상이 없는 경우, 이벤트를 재발행하지 않고 종료.
          log.info("[ListenAgentInstallationValidation] Agent Submission not found. agentSubmissionId : {}", agentSubmissionValidationEvent.getAgentSubmissionId());
       } catch (Exception e) { // 통신이 불가 혹은 Json 변환 예외를 포함한 예외는 이벤트 재발행하여 재시도.
          log.error("[ListenAgentInstallationValidation] Validation failed. agentSubmissionId : {}, error : {}", agentSubmissionValidationEvent.getAgentSubmissionId(), e.getMessage());
          publishValidationRetryEvent(agentSubmissionValidationEvent);
       }
    }

    private void publishValidationRetryEvent(AgentSubmissionValidationEvent agentSubmissionValidationEvent) {
       producer.publishAgentSubmissionValidationRetryEvent(new AgentSubmissionValidationRetryEvent(agentSubmissionValidationEvent));
    }

    @KafkaListener(
       topics = "${spring.kafka.topic.agent-install-validation-retry.name}",
       containerFactory = "pollingKafkaListenerContainerFactory",
       groupId = "${spring.kafka.topic.agent-install-validation-retry.group-id}"
    )
    public void listenAgentInstallationValidationRetry(ConsumerRecord<String, AgentSubmissionValidationRetryEvent> event) {
       AgentSubmissionValidationRetryEvent agentSubmissionValidationRetryEvent = event.value();
       try {
          agentSubmissionAutoService.validateAgentSubmission(agentSubmissionValidationRetryEvent.getAgentSubmissionId());
       }  catch (NoSuchElementException e) {
          log.info("[ListenAgentInstallationValidationRetry] Agent Submission not found. agentSubmissionId : {}", agentSubmissionValidationRetryEvent.getAgentSubmissionId());
       } catch (Exception e) {
          log.error("[ListenAgentInstallationValidationRetry] Validation failed. agentSubmissionId : {}, error : {}", agentSubmissionValidationRetryEvent.getAgentSubmissionId(), e.getMessage());
          publishValidationRetryEvent(agentSubmissionValidationRetryEvent);
       }
    }

    private void publishValidationRetryEvent(AgentSubmissionValidationRetryEvent agentSubmissionValidationRetryEvent) {
       producer.publishAgentSubmissionValidationRetryEvent(agentSubmissionValidationRetryEvent.retry());
    }

    @KafkaListener(
       topics = "${spring.kafka.topic.agent-install-request.name}",
       containerFactory = "defaultKafkaListenerFactory",
       groupId = "${spring.kafka.topic.agent-install-request.group-id}"
    )
    public void agentInstallRequestListener(ConsumerRecord<String, AgentInstallRequestEvent> message) {
        agentInstallationFacade.orchestrateAgentInstallation(message.value());
    }

    @KafkaListener(
       topics = "${spring.kafka.topic.agent-install-state-update.name}",
       containerFactory = "defaultKafkaListenerFactory",
       groupId = "${spring.kafka.topic.agent-install-state-update.group-id}"
    )
    public void agentSubmissionStateUpdateListener(ConsumerRecord<String, AgentSubmissionStateUpdateEvent> message) {
       AgentSubmissionStateUpdateEvent<?> rawEvent = message.value();
       AgentSubmissionStateUpdatePayload convertedPayload =
          objectMapper.convertValue(rawEvent.getPayload(), AgentSubmissionStateUpdatePayload.class);

       AgentSubmissionStateUpdateEvent<AgentSubmissionStateUpdatePayload> finalEvent =
          AgentSubmissionStateUpdateEvent.<AgentSubmissionStateUpdatePayload>builder()
             .eventSeq(rawEvent.getEventSeq())
             .eventStatus(rawEvent.getEventStatus())
             .eventTime(rawEvent.getEventTime())
             .payload(convertedPayload)
             .build();
       agentSubmissionAutoService.updateProgressStateToInstallCheck(finalEvent);
    }

    @KafkaListener(
       topics = "${spring.kafka.topic.agent-install-ip-update.name}",
       containerFactory = "defaultKafkaListenerFactory",
       groupId = "${spring.kafka.topic.agent-install-ip-update.group-id}"
    )
    public void agentSubmissionIpUpdateListener(ConsumerRecord<String, AgentSubmissionIpUpdateEvent> message) {
       agentInstallationFacade.orchestrateUpdateAgentSubmissionIp(message.value());
    }

    @KafkaListener(
       topics = "${spring.kafka.topic.agent-deletion.name}",
       containerFactory = "defaultKafkaListenerFactory",
       groupId = "${spring.kafka.topic.agent-deletion.group-id}"
    )
    public void agentSubmissionDeleteListener(ConsumerRecord<String, AgentSubmissionDeletionEvent> message) {
       // auto service를 통해 host_mapping 테이블 delete 처리
       if (agentSubmissionAutoService.deleteAgentSubmission(message.value())) {
          // Flex Appliance 정리
          agentInstallationFacade.orchestrateDeleteAgentSubmission(message.value());
       }
    }
}

 

이 컨슈머에서 의존하고 있는 Facade나 Service Component에 Kafka로부터 받은 Event에 대한 상태 체킹 및 비즈니스 로직을 위임시키는데 해당 컴포넌트에선 DB에 이벤트에 대한 query 후 이벤트 상태에 따라 제어하는 식으로 흐름이 이어지지만, 결국 DB Transaction이나 Kafka Connection 문제 혹은 기타 예외 상황에 try-catch로 일일이 처리해야 한다. 비즈니스 요건 특성상 세밀한 제어까진 불필요했어서 단순 구조로 처리했으나, 이 부분이 개인적으론 굉장히 아쉬웠던 상황이었다. 해당 코드의 일부는 아래와 같다.

/**
 * 에이전트 설치 신청 자동화를 위한 Facade 컴포넌트.
 *
 * @author HakHyeon Song
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class AgentInstallationFacade {

    private final EventOutboxService eventOutboxService;

    private final FlexSessionManager flexSessionManager;
    private final FlexInstanceAdapter flexInstanceAdapter;

    private final HostMappingService hostMappingService;
    private final VeritasApplianceRoutingService veritasApplianceRoutingService;

    private final AgentSubmissionAdminEventProducer agentSubmissionEventProducer;
    private final AgentSubmissionRetryEventProducer agentSubmissionRetryEventProducer;

    /**
     * 에이전트 설치 신청 자동화 처리를 수행.
     *
     * @param event 에이전트 설치 신청 이벤트
     */
    public void orchestrateAgentInstallation(AgentInstallRequestEvent event) {
       var eventOutbox = eventOutboxService.getEventOutbox(event.getEventSeq());

       // Event Outbox 상태 검증
       if (!Objects.equals(eventOutbox.getEventStatus(), EventOutboxStatusEnum.PENDING.getId())) {
          return;
       }

       if (Objects.equals(eventOutbox.getEventStatus(), EventOutboxStatusEnum.FAILED.getId())) {
          log.warn("Skipping retry: EventOutbox[seq={}] has FAILED status (retry limit exceeded)", event.getEventSeq());
          return;
       }

       var command = new EtcHostsUpdateCommand(
             event.getProjectId(),
             event.getStationId(),
             event.getVeritasApplianceSeq(),
             event.getHostname(),
             null,
             event.getIp()
       );

       try {
          doOrchestration(command);
          eventOutboxService.updateEventStatus(eventOutbox.getEventOutboxSeq(), EventOutboxStatusEnum.PROCESSING);

          // producer 호출
          AgentSubmissionStateUpdateEvent<AgentSubmissionStateUpdatePayload> agentSubmissionStateUpdateEvent = AgentSubmissionStateUpdateEventFactory.buildAgentSubmissionStateUpdateEvent(event, eventOutbox);
          agentSubmissionEventProducer.publishAgentInstallStateToInstallCheckEvent(agentSubmissionStateUpdateEvent);
       } catch (Exception e) {
          log.error("Agent installation orchestration failed. eventOutboxSeq={} - retrying via retry topic.", event.getEventSeq(), e);
          // retry count 증가 후 retry producer 호출
          eventOutboxService.incrementRetryCount(event.getEventSeq());
          agentSubmissionRetryEventProducer.publishAgentInstallationRequestRetry(event);
       }

    }

 

배포 환경 및 인프라 자원에 대한 자유도가 우리한테 없었기에 코드레벨에서 직접 한땀한땀(?) 처리했던 아쉬움을 뒤로하고, 아키텍처 수준 구현으로 보는 정석적인 방식에 대해 살펴보자.

아키텍처 수준 구현 (대표적으로 Debezium CDC 사용)

  1. 비즈니스 트랜잭션 내에 Outbox 테이블 insert
  2. Outbox 테이블에 insert만 하고 끝
  3. 별도의 CDC 시스템 (Debezium 등)이 Outbox 테이블의 변경을 감지
  4. Kafka로 메시지를 publish
항목 설명
✅ Kafka 전송 시점 DB commit 이후, CDC에 의해 자동 publish
✅ 일관성 DB 트랜잭션과 메시지 전송이 사실상 원자적
✅ 소비 로직 Kafka 메시지를 처리만 하면 됨, Outbox 테이블 재조회 없음
✅ 부가 조건 DB → Kafka 간의 CDC 기반 로그 전송 시스템 필요 (Debezium 등)

핵심 차이 정리

항목 사용자 정의 방식 (코드 기반) 트랜잭셔널 아웃박스 (정석)
Kafka 전송 시점 코드에서 send() 호출 CDC가 DB 변경 감지 후 자동
Kafka 전송 실패 시 DB는 commit, Kafka 전송 실패 가능 DB commit이 있어야만 Kafka 전송됨
Outbox 테이블 재조회 필요 (Consumer 측에서) 불필요
외부 의존성 없음 (Spring 코드로 가능) 필요 (Debezium, Kafka Connect 등)
복잡도 낮음 (개발자 제어) 높음 (운영 구성 필요)
일관성 보장 낮음 (at-least-once 이상 보장 어려움) 높음 (exactly-once 가능)

 

결국, 트랜잭셔널 아웃박스 패턴의 본질은 CDC 기반 Kafka publish와 DB commit 시점 연동이다.

 

환경적인 제약 사항 때문에 우리 프로젝트에서 처리했던 방식처럼 코드 레벨에서 send() 호출 후 consumer에서 Outbox 테이블을 재조회하는 방식은 대체로 안티패턴에 가깝고, 실제 아웃박스 패턴의 원자성, 일관성, 비동성 이점을 취하려면 Debezium 기반의 CDC 흐름으로 가야 하지 않을까 싶다.