최근 BaaS 프로젝트에서 사용자의 신청서를 토대로 어드민이 외부 백업 장비에 직접 해당 요청에 대한 작업 후 사용자의 신청서의 상태를 일일이 바꿔야 하는 매니지드 기반 서비스 형태에서 모든 프로세스를 자동화시키는 셀프 서비스 형태로 리뉴얼 했다.
내가 알기론, 백업/복구 솔루션은 대부분 사용자의 서버에 백업 장비와 통신하기 위한 에이전트를 설치하도록 유도하고 사용자가 백업 에이전트를 직접 설치된 서버와 백업 장비간 통신이 원활하게 맺어지는지 검증이 되어야 풀 백업/증분 백업 및 복구가 가능한 것으로 알고 있다. 이는 우리 서비스도 마찬가지다.
여기서 셀프 서비스의 가장 핵심은 에이전트 설치와 검증 처리에 대한 자동화가 관건이었는데, 사용자가 직접 에이전트 설치 후 체킹 요청 시 백엔드에서 동기적으로 외부 API들을 연계해서 이를 처리하기엔 네트워크 상 timeout이 발생할 여지가 컸고, 더 나아가 사용자의 UX 측면에서 좋지 않았다.
MSA 구조의 백엔드는 아니었으나, 셀프서비스의 경우 Event Driven Architecture로 전환하는게 올바른 선택이란 판단에 Kafka를 도입했고, 개발 환경의 제약 상 코드 레벨에서 직접 Step Processing, Transactional Outbox Pattern을 적용했다.
정석적인 Transactional Outbox Pattern이 아닌 취사선택해서 적용했으나, 우리가 처리했던 이 패턴에 대해 사실과 오해를 정리하고자 이번 글을 예제 시나리오와 실제 프로덕트 코드의 일부를 기반으로 작성해본다.
전체 아키텍처 및 흐름도
우리 프로덕트와는 별개로 예제 시나리오에 대한 전체적인 아키텍처와 흐름도는 아래와 같다.
[사용자 요청]
↓
[Spring Service]
└─ 비즈니스 처리 (ex: 주문 생성)
└─ Outbox 테이블에 이벤트 기록 (같은 트랜잭션)
↓
[DB Commit]
↓
[Debezium Connector]
└─ DB binlog 감지 (Outbox 테이블 INSERT)
↓
[Kafka Connect]
└─ Kafka로 이벤트 발행 (outbox-topic 등)
↓
[Kafka Consumer 서비스]
└─ 메시지 처리 (이벤트 기반 후처리, 알림 전송, 포인트 적립 등)
예시
일반적인 outbox_event 테이블과 처리 예시는 아래와 같다.
CREATE TABLE outbox_event (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_type VARCHAR(50), -- 예: ORDER
aggregate_id VARCHAR(50), -- 예: 주문 ID
type VARCHAR(50), -- 예: OrderCreated
payload JSON, -- 이벤트 데이터
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
@Transactional
public void createOrder(OrderDto dto) {
Order order = orderRepository.save(dto.toEntity());
OutboxEvent event = OutboxEvent.builder()
.aggregateType("ORDER")
.aggregateId(order.getId().toString())
.type("OrderCreated")
.payload(toJson(order)) // JSON 직렬화
.build();
outboxEventRepository.save(event);
}
Order
와 OutboxEvent
는 같은 트랜잭션에서 처리한다.
Debezium Connector 설정 (예: MySQL → Kafka)
{
"name": "mysql-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "85744",
"database.server.name": "app-db",
"database.include.list": "app_db",
"table.include.list": "app_db.outbox_event",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "none",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Kafka 메시지 예시
{
"aggregateType": "ORDER",
"aggregateId": "12345",
"type": "OrderCreated",
"payload": {
"orderId": "12345",
"userId": "abcde",
"total": 39000
},
"created_at": "2025-06-22T10:00:00Z"
}
Consumer
아래 예시처럼, 컨슈머 레벨에선 이벤트 타입 체크 후 간단한 방식으로 처리가 가능하다.
@KafkaListener(topics = "app-db.app_db.outbox_event", groupId = "order-consumer")
public void handleOutboxEvent(String message) {
OutboxEventPayload event = objectMapper.readValue(message, OutboxEventPayload.class);
if ("OrderCreated".equals(event.getType())) {
orderEventProcessor.process(event.getPayload());
}
}
구조의 장점
항목 | 설명 |
---|---|
데이터 일관성 | DB commit이 되어야만 Kafka publish 발생 (binlog 기반) |
부하 분산 | producer 직접 호출 없음 → non-blocking |
확장성 | 이벤트 기반 아키텍처로 다양한 후처리 서비스 구성 가능 |
신뢰성 | Kafka delivery semantics + outbox 로그로 보상 설계 용이 |
코드 기반 제어와 정석적인 구조?
사용자 정의 방식 (code-level Outbox)
- 트랜잭션 내 비즈니스 로직 수행 + Outbox 테이블 Insert
- 트랜잭션 commit 후 producer.send()
- Consumer는 메시지 수신 후 Outbox 테이블 조회 → 조건 판단
항목 | 설명 |
---|---|
✅ 장점 | 트랜잭션 내부에서 DB와 Outbox를 함께 처리 가능 |
❌ 문제 | Producer send()는 트랜잭션 외부 → Kafka와 DB의 일관성 보장 안 됨 |
❌ 문제 | Consumer가 별도로 DB에 다시 질의해야 함 (성능 저하 및 중복 로직) |
❌ 패턴 아님 | 이건 트랜잭셔널 아웃박스 패턴이 아니라, Outbox 테이블 + 추가 로직 처리 패턴임 |
이 방식은 트랜잭셔널 아웃박스의 핵심인 "DB → Kafka로의 자동 전달"과 원자적 일관성 확보를 보장하지 않는다.
가령, 우리 서비스 내부의 실제 코드 일부는 아래와 같다.
/**
* 어드민 에이전트 신청 관련 이벤트 컨슈머.
* @author HakHyeon Song
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "app.mode", havingValue = "admin")
@RequiredArgsConstructor
public class AgentSubmissionAdminEventConsumer {
private final AgentSubmissionAutoService agentSubmissionAutoService;
private final AgentSubmissionAdminEventProducer producer;
private final AgentInstallationFacade agentInstallationFacade;
private final ObjectMapper objectMapper;
/**
* 에이전트 설치 검증 이벤트 리스너.
* 검증 실패 시, 재시도하기 위해 이벤트 재발행.
*/
@KafkaListener(
topics = "${spring.kafka.topic.agent-install-validation.name}",
containerFactory = "pollingKafkaListenerContainerFactory",
groupId = "${spring.kafka.topic.agent-install-validation.group-id}"
)
public void listenAgentInstallationValidation(ConsumerRecord<String, AgentSubmissionValidationEvent> event) {
AgentSubmissionValidationEvent agentSubmissionValidationEvent = event.value();
try {
agentSubmissionAutoService.validateAgentSubmission(agentSubmissionValidationEvent.getAgentSubmissionId());
} catch (NoSuchElementException e) { // 검증 대상이 없는 경우, 이벤트를 재발행하지 않고 종료.
log.info("[ListenAgentInstallationValidation] Agent Submission not found. agentSubmissionId : {}", agentSubmissionValidationEvent.getAgentSubmissionId());
} catch (Exception e) { // 통신이 불가 혹은 Json 변환 예외를 포함한 예외는 이벤트 재발행하여 재시도.
log.error("[ListenAgentInstallationValidation] Validation failed. agentSubmissionId : {}, error : {}", agentSubmissionValidationEvent.getAgentSubmissionId(), e.getMessage());
publishValidationRetryEvent(agentSubmissionValidationEvent);
}
}
private void publishValidationRetryEvent(AgentSubmissionValidationEvent agentSubmissionValidationEvent) {
producer.publishAgentSubmissionValidationRetryEvent(new AgentSubmissionValidationRetryEvent(agentSubmissionValidationEvent));
}
@KafkaListener(
topics = "${spring.kafka.topic.agent-install-validation-retry.name}",
containerFactory = "pollingKafkaListenerContainerFactory",
groupId = "${spring.kafka.topic.agent-install-validation-retry.group-id}"
)
public void listenAgentInstallationValidationRetry(ConsumerRecord<String, AgentSubmissionValidationRetryEvent> event) {
AgentSubmissionValidationRetryEvent agentSubmissionValidationRetryEvent = event.value();
try {
agentSubmissionAutoService.validateAgentSubmission(agentSubmissionValidationRetryEvent.getAgentSubmissionId());
} catch (NoSuchElementException e) {
log.info("[ListenAgentInstallationValidationRetry] Agent Submission not found. agentSubmissionId : {}", agentSubmissionValidationRetryEvent.getAgentSubmissionId());
} catch (Exception e) {
log.error("[ListenAgentInstallationValidationRetry] Validation failed. agentSubmissionId : {}, error : {}", agentSubmissionValidationRetryEvent.getAgentSubmissionId(), e.getMessage());
publishValidationRetryEvent(agentSubmissionValidationRetryEvent);
}
}
private void publishValidationRetryEvent(AgentSubmissionValidationRetryEvent agentSubmissionValidationRetryEvent) {
producer.publishAgentSubmissionValidationRetryEvent(agentSubmissionValidationRetryEvent.retry());
}
@KafkaListener(
topics = "${spring.kafka.topic.agent-install-request.name}",
containerFactory = "defaultKafkaListenerFactory",
groupId = "${spring.kafka.topic.agent-install-request.group-id}"
)
public void agentInstallRequestListener(ConsumerRecord<String, AgentInstallRequestEvent> message) {
agentInstallationFacade.orchestrateAgentInstallation(message.value());
}
@KafkaListener(
topics = "${spring.kafka.topic.agent-install-state-update.name}",
containerFactory = "defaultKafkaListenerFactory",
groupId = "${spring.kafka.topic.agent-install-state-update.group-id}"
)
public void agentSubmissionStateUpdateListener(ConsumerRecord<String, AgentSubmissionStateUpdateEvent> message) {
AgentSubmissionStateUpdateEvent<?> rawEvent = message.value();
AgentSubmissionStateUpdatePayload convertedPayload =
objectMapper.convertValue(rawEvent.getPayload(), AgentSubmissionStateUpdatePayload.class);
AgentSubmissionStateUpdateEvent<AgentSubmissionStateUpdatePayload> finalEvent =
AgentSubmissionStateUpdateEvent.<AgentSubmissionStateUpdatePayload>builder()
.eventSeq(rawEvent.getEventSeq())
.eventStatus(rawEvent.getEventStatus())
.eventTime(rawEvent.getEventTime())
.payload(convertedPayload)
.build();
agentSubmissionAutoService.updateProgressStateToInstallCheck(finalEvent);
}
@KafkaListener(
topics = "${spring.kafka.topic.agent-install-ip-update.name}",
containerFactory = "defaultKafkaListenerFactory",
groupId = "${spring.kafka.topic.agent-install-ip-update.group-id}"
)
public void agentSubmissionIpUpdateListener(ConsumerRecord<String, AgentSubmissionIpUpdateEvent> message) {
agentInstallationFacade.orchestrateUpdateAgentSubmissionIp(message.value());
}
@KafkaListener(
topics = "${spring.kafka.topic.agent-deletion.name}",
containerFactory = "defaultKafkaListenerFactory",
groupId = "${spring.kafka.topic.agent-deletion.group-id}"
)
public void agentSubmissionDeleteListener(ConsumerRecord<String, AgentSubmissionDeletionEvent> message) {
// auto service를 통해 host_mapping 테이블 delete 처리
if (agentSubmissionAutoService.deleteAgentSubmission(message.value())) {
// Flex Appliance 정리
agentInstallationFacade.orchestrateDeleteAgentSubmission(message.value());
}
}
}
이 컨슈머에서 의존하고 있는 Facade나 Service Component에 Kafka로부터 받은 Event에 대한 상태 체킹 및 비즈니스 로직을 위임시키는데 해당 컴포넌트에선 DB에 이벤트에 대한 query 후 이벤트 상태에 따라 제어하는 식으로 흐름이 이어지지만, 결국 DB Transaction이나 Kafka Connection 문제 혹은 기타 예외 상황에 try-catch로 일일이 처리해야 한다. 비즈니스 요건 특성상 세밀한 제어까진 불필요했어서 단순 구조로 처리했으나, 이 부분이 개인적으론 굉장히 아쉬웠던 상황이었다. 해당 코드의 일부는 아래와 같다.
/**
* 에이전트 설치 신청 자동화를 위한 Facade 컴포넌트.
*
* @author HakHyeon Song
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AgentInstallationFacade {
private final EventOutboxService eventOutboxService;
private final FlexSessionManager flexSessionManager;
private final FlexInstanceAdapter flexInstanceAdapter;
private final HostMappingService hostMappingService;
private final VeritasApplianceRoutingService veritasApplianceRoutingService;
private final AgentSubmissionAdminEventProducer agentSubmissionEventProducer;
private final AgentSubmissionRetryEventProducer agentSubmissionRetryEventProducer;
/**
* 에이전트 설치 신청 자동화 처리를 수행.
*
* @param event 에이전트 설치 신청 이벤트
*/
public void orchestrateAgentInstallation(AgentInstallRequestEvent event) {
var eventOutbox = eventOutboxService.getEventOutbox(event.getEventSeq());
// Event Outbox 상태 검증
if (!Objects.equals(eventOutbox.getEventStatus(), EventOutboxStatusEnum.PENDING.getId())) {
return;
}
if (Objects.equals(eventOutbox.getEventStatus(), EventOutboxStatusEnum.FAILED.getId())) {
log.warn("Skipping retry: EventOutbox[seq={}] has FAILED status (retry limit exceeded)", event.getEventSeq());
return;
}
var command = new EtcHostsUpdateCommand(
event.getProjectId(),
event.getStationId(),
event.getVeritasApplianceSeq(),
event.getHostname(),
null,
event.getIp()
);
try {
doOrchestration(command);
eventOutboxService.updateEventStatus(eventOutbox.getEventOutboxSeq(), EventOutboxStatusEnum.PROCESSING);
// producer 호출
AgentSubmissionStateUpdateEvent<AgentSubmissionStateUpdatePayload> agentSubmissionStateUpdateEvent = AgentSubmissionStateUpdateEventFactory.buildAgentSubmissionStateUpdateEvent(event, eventOutbox);
agentSubmissionEventProducer.publishAgentInstallStateToInstallCheckEvent(agentSubmissionStateUpdateEvent);
} catch (Exception e) {
log.error("Agent installation orchestration failed. eventOutboxSeq={} - retrying via retry topic.", event.getEventSeq(), e);
// retry count 증가 후 retry producer 호출
eventOutboxService.incrementRetryCount(event.getEventSeq());
agentSubmissionRetryEventProducer.publishAgentInstallationRequestRetry(event);
}
}
배포 환경 및 인프라 자원에 대한 자유도가 우리한테 없었기에 코드레벨에서 직접 한땀한땀(?) 처리했던 아쉬움을 뒤로하고, 아키텍처 수준 구현으로 보는 정석적인 방식에 대해 살펴보자.
아키텍처 수준 구현 (대표적으로 Debezium CDC 사용)
- 비즈니스 트랜잭션 내에 Outbox 테이블 insert
- Outbox 테이블에 insert만 하고 끝
- 별도의 CDC 시스템 (Debezium 등)이 Outbox 테이블의 변경을 감지
- Kafka로 메시지를 publish
항목 | 설명 |
---|---|
✅ Kafka 전송 시점 | DB commit 이후, CDC에 의해 자동 publish |
✅ 일관성 | DB 트랜잭션과 메시지 전송이 사실상 원자적 |
✅ 소비 로직 | Kafka 메시지를 처리만 하면 됨, Outbox 테이블 재조회 없음 |
✅ 부가 조건 | DB → Kafka 간의 CDC 기반 로그 전송 시스템 필요 (Debezium 등) |
핵심 차이 정리
항목 | 사용자 정의 방식 (코드 기반) | 트랜잭셔널 아웃박스 (정석) |
---|---|---|
Kafka 전송 시점 | 코드에서 send() 호출 | CDC가 DB 변경 감지 후 자동 |
Kafka 전송 실패 시 | DB는 commit, Kafka 전송 실패 가능 | DB commit이 있어야만 Kafka 전송됨 |
Outbox 테이블 재조회 | 필요 (Consumer 측에서) | 불필요 |
외부 의존성 | 없음 (Spring 코드로 가능) | 필요 (Debezium, Kafka Connect 등) |
복잡도 | 낮음 (개발자 제어) | 높음 (운영 구성 필요) |
일관성 보장 | 낮음 (at-least-once 이상 보장 어려움) | 높음 (exactly-once 가능) |
결국, 트랜잭셔널 아웃박스 패턴의 본질은 CDC 기반 Kafka publish와 DB commit 시점 연동이다.
환경적인 제약 사항 때문에 우리 프로젝트에서 처리했던 방식처럼 코드 레벨에서 send() 호출 후 consumer에서 Outbox 테이블을 재조회하는 방식은 대체로 안티패턴에 가깝고, 실제 아웃박스 패턴의 원자성, 일관성, 비동성 이점을 취하려면 Debezium 기반의 CDC 흐름으로 가야 하지 않을까 싶다.