포스트

Kafka로 시세 파이프라인 분리하기 — 토픽 설계와 at-least-once

왜 Kafka를 넣었는가

초기 monticker는 단순한 구조였다. Worker가 1초마다 가격을 생성하고, Redis에 쓰고, 이벤트를 탐지하고, 캔들을 집계했다. 하나의 @Scheduled 메서드가 모든 걸 순차적으로 처리했다.

1
2
3
4
5
MockPriceGenerator (1초 주기)
  → RedisTickWriter
  → CandleAggregator
  → EventDetector
  → AlertEvaluator

문제는 확장이다. 나중에 KIS WebSocket처럼 외부에서 데이터가 밀려들어올 때, JVM Worker가 수집과 처리를 동시에 하면 두 관심사가 뒤섞인다.

Kafka를 넣으면 수집(Go Gateway)과 처리(Kotlin Worker)가 분리된다.

1
2
3
Go Gateway → [Kafka] → Kotlin Worker → Redis + TimescaleDB + EventDetector
                    ↘
                      Netty Broadcast Gateway → WebSocket clients

둘을 분리하면 얻는 것이 있다. 수집기가 죽어도 이미 Kafka에 쌓인 틱은 사라지지 않는다. Worker가 재시작되면 멈춘 오프셋부터 다시 처리한다.


Kafka 설정 (docker-compose.yml)

monticker는 KRaft 모드(ZooKeeper 없는 Kafka 3.8)를 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kafka:
  image: apache/kafka:3.8.0
  environment:
    KAFKA_NODE_ID: 1
    KAFKA_PROCESS_ROLES: broker,controller
    # 두 개의 리스너: Docker 내부망용과 호스트 접근용
    KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
    KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
    KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    CLUSTER_ID: "monticker-kafka-cluster-001"

리스너를 두 개로 나눈 이유가 있다. Docker 컨테이너에서 PLAINTEXT://kafka:9092로 Kafka에 접근하면 메타데이터 응답에 kafka:9092가 담긴다. 호스트에서 실행 중인 Go Gateway는 kafka라는 호스트명을 해석하지 못한다. EXTERNAL://localhost:29092를 추가해 호스트에서도 접근 가능하게 만든다.


토픽 설계

monticker는 두 개의 토픽을 사용한다.

market.ticks

시세 틱 메시지. Go Gateway가 발행하고 Worker와 Netty Gateway가 소비한다.

1
2
3
4
토픽: market.ticks
파티션: 6
복제 인수: 1 (단일 브로커)
키: stockId (해시 파티셔닝)

파티션을 6개로 나눈 이유는 종목 수(202개)보다 훨씬 적지만, 1개일 때보다 병렬성이 향상된다. 나중에 Worker를 여러 인스턴스로 늘리면 파티션당 하나씩 배분된다.

키 해싱과 순서 보장

1
2
3
stockId=1  → hash(1) % 6 = 파티션 3
stockId=2  → hash(2) % 6 = 파티션 0
stockId=1  → hash(1) % 6 = 파티션 3  (항상 같은 파티션)

같은 종목의 틱은 항상 같은 파티션으로 들어간다. 파티션 내에서는 순서가 보장되므로 캔들 집계 시 시간 역전이 없다.

market.events

Worker가 이벤트 탐지 결과를 발행하는 토픽. Netty Broadcast Gateway가 이를 소비해 WebSocket 클라이언트에 전달한다.

1
2
3
토픽: market.events
파티션: 3
키: stockId

Worker Kafka 연동 — @ConditionalOnProperty

Kafka 없이 기존 MockPriceGenerator 방식으로도 동작해야 한다. 환경변수 INGESTION_SOURCE로 경로를 전환한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// KafkaConfig.kt
@Configuration
@EnableKafka
@ConditionalOnProperty(name = ["ingestion.source"], havingValue = "kafka")
class KafkaConfig(
    @Value("\${kafka.brokers:localhost:9092}") private val brokers: String
) {
    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        val props = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
            ConsumerConfig.GROUP_ID_CONFIG to "monticker-worker",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
        )
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory() =
        ConcurrentKafkaListenerContainerFactory<String, String>().also {
            it.consumerFactory = consumerFactory()
        }
}

@ConditionalOnPropertyingestion.source=kafka일 때만 이 Bean을 등록한다. 그 외 경우 Kafka 관련 Bean은 아예 생성되지 않는다.

1
2
3
4
5
# application.yml
ingestion:
  source: ${INGESTION_SOURCE:internal}  # 기본값: internal (mock 경로)
kafka:
  brokers: ${KAFKA_BROKERS:localhost:9092}

TickKafkaConsumer — @KafkaListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
@ConditionalOnProperty(name = ["ingestion.source"], havingValue = "kafka")
class TickKafkaConsumer(
    private val redisTickWriter: RedisTickWriter,
    private val candleAggregator: CandleAggregator,
    private val eventDetector: EventDetector,
    private val latencyTracker: LatencyTracker,
) {
    private val objectMapper = ObjectMapper().registerModule(JavaTimeModule())
    private val log = LoggerFactory.getLogger(javaClass)

    @KafkaListener(
        topics = ["market.ticks"],
        groupId = "monticker-worker",
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun onTick(record: ConsumerRecord<String, String>) {
        runCatching {
            val tick = objectMapper.readValue(record.value(), GeneratedTick::class.java)

            // 파이프라인 지연 추적
            latencyTracker.recordTickGenerated(tick.stockId, tick.generatedAt)

            // 처리 파이프라인
            redisTickWriter.write(tick)
            candleAggregator.onTick(tick)
            latencyTracker.recordRedisWrite(tick.stockId)
            eventDetector.detect(tick)
            latencyTracker.recordBroadcast(tick.stockId)

        }.onFailure { e ->
            log.error("틱 처리 실패 [key={}]: {}", record.key(), e.message)
            // at-least-once: 예외를 삼켜서 오프셋이 커밋되게 한다
            // 처리 실패한 틱은 유실하되, 파이프라인은 계속 진행
        }
    }
}

at-least-once와 멱등성

Kafka의 기본 소비 시맨틱은 at-least-once다. 컨슈머가 메시지를 처리한 후 오프셋을 커밋하기 전에 장애가 나면, 재시작 후 같은 메시지를 다시 받는다.

monticker에서 이 중복이 문제가 되는지 체크한다.

처리 단계중복 시 영향처리 방법
redisTickWriter최신 가격이 같은 값으로 다시 쓰임SET → 덮어쓰기, 무해
candleAggregator같은 틱이 두 번 집계됨1분봉 시작/종료 판단은 시각 기준 → 타임스탬프로 중복 방지
eventDetector같은 이벤트가 두 번 탐지될 수 있음INSERT ... ON CONFLICT DO NOTHING으로 중복 방지

이벤트 중복 방지 쿼리:

1
2
3
4
INSERT INTO stock_events (stock_id, event_type, triggered_at, ...)
VALUES (?, ?, ?, ...)
ON CONFLICT (stock_id, event_type, date_trunc('minute', triggered_at))
DO NOTHING;

같은 종목·이벤트 유형·분(minute) 조합이 이미 있으면 무시한다. 이것이 멱등 처리(idempotent processing) 의 구현이다.


Mock 경로와의 공존

Kafka 경로가 활성화되면 기존 Mock 경로는 비활성화된다.

1
2
3
4
5
6
7
8
9
10
11
12
// MarketDataCollector.kt
@Scheduled(fixedDelay = 1000)
fun collect() {
    if (ingestionSource == "kafka") return  // Kafka 경로로 대체됨
    // 기존 MockPriceGenerator 로직
    stocks.forEach { stock ->
        val tick = mockPriceGenerator.generate(stock)
        redisTickWriter.write(tick)
        candleAggregator.onTick(tick)
        eventDetector.detect(tick)
    }
}

ingestion.source 하나로 두 경로를 전환한다. 기존 코드를 삭제하지 않았으므로, Kafka 없이 로컬 개발 시 그대로 동작한다.


정리

  • Kafka는 수집과 처리를 분리해 각 컴포넌트가 독립적으로 재시작·확장 가능하게 한다.
  • 키 해싱으로 같은 종목 틱이 같은 파티션에 순서대로 쌓이고, 캔들 집계 시 시간 역전을 방지한다.
  • at-least-once 소비와 멱등 처리(ON CONFLICT DO NOTHING)를 조합해 중복 없는 결과를 만든다.

다음 편에서는 Kafka에서 받은 틱을 WebSocket 클라이언트에 브로드캐스트하는 Netty Broadcast Gateway를 다룬다.

  1. 1 가격이 아니라 이벤트를 팔자 — monticker 설계 철학
  2. 2 모듈식 모놀리스를 선택한 이유 — MSA의 유혹을 거부하기
  3. 3 TimescaleDB를 시계열 DB로 고른 이유 — Hypertable과 연속 집계
  4. 4 Go goroutine으로 202개 종목 동시 수집하기 — Market Gateway 설계
  5. 5 Kafka로 시세 파이프라인 분리하기 — 토픽 설계와 at-least-once
  6. 6 Netty로 수만 연결에 시세 브로드캐스트하기 — NioEventLoopGroup 리액터 패턴
  7. 7 EMA 기반 이상 탐지 — 가격 급등과 거래량 서지 실시간 감지
  8. 8 TreeMap으로 CLOB 호가창 구현하기 — 가격/시간 우선 매칭과 슬리피지
  9. 9 주문 전 동기 리스크 게이트 설계 — VaR, 집중도, 일일손실 5가지 규칙
  10. 10 잔고를 저장하지 말고 재구성하라 — 이벤트 소싱 원장 설계
  11. 11 감정 태그 × 수익률 — 투자 습관을 데이터로 기록하기
  12. 12 룰 엔진: RSI·MACD 조건식을 JSON DSL로 — Quant Lab 설계
  13. 13 백테스트 엔진: look-ahead 없는 시뮬레이션 — Sharpe·MDD·PF 계산
  14. 14 전략 지문(SHA-256)으로 룰셋 보호하기 — 서버 사이드 실행과 역공학 방어
  15. 15 Markowitz 최적화를 솔버 없이 구현하기 — 프로젝션 경사하강법
  16. 16 Kelly Criterion: 수학적 파산 방지 베팅 비율 — Half Kelly와 백테스트 연동
  17. 17 ZigZag + 패턴 템플릿 매칭으로 차트 패턴 감지 — 헤드앤숄더·이중바닥
  18. 18 ADX로 시장 국면 분류하기 — BULL·BEAR·SIDEWAYS·HIGH_VOL
  19. 19 손익통산으로 세금 줄이기 — Tax-Loss Harvesting 시뮬레이션
  20. 20 MockK로 JdbcTemplate 목킹하기 — 311개 테스트 작성 경험
  21. 21 OpenTelemetry + Jaeger로 분산 추적 — 시세 파이프라인 지연 측정
  22. 22 Circuit Breaker로 외부 API 장애 격리 — Resilience4j + KIS·Yahoo 폴백 체인
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.

댓글

아직 댓글이 없습니다