포스트

Netty로 수만 연결에 시세 브로드캐스트하기 — NioEventLoopGroup 리액터 패턴

Spring STOMP의 한계

처음에는 Spring의 STOMP over WebSocket으로 시세를 브로드캐스트했다.

1
2
// Spring STOMP 방식
messagingTemplate.convertAndSend("/topic/stocks/$stockId", priceUpdate)

이 방식의 문제는 스레드 모델에 있다. Spring MVC는 요청당 스레드(thread-per-request) 모델을 기본으로 하고, WebSocket 연결도 일정 수의 스레드가 처리한다. 동시 연결 수가 수천을 넘어가면 스레드 컨텍스트 스위칭 비용이 올라간다.

시세 브로드캐스트는 특수한 워크로드다.

  • 쓰기 집중: 같은 데이터를 수천 연결에 반복 전달
  • 메시지 크기 작음: 수십 바이트 JSON
  • 높은 빈도: 종목당 1초마다 한 번

이 패턴에 최적화된 것이 Netty의 NIO 리액터 패턴이다.


리액터 패턴이란

1
2
3
4
5
6
7
8
9
10
11
12
13
기존 (Thread-per-connection):
  연결 1 → 스레드 1 (대기 중)
  연결 2 → 스레드 2 (대기 중)
  연결 3 → 스레드 3 (대기 중)
  ...
  연결 10000 → 스레드 10000 (대기 중)
  → 대부분의 스레드가 I/O 대기 상태. 메모리 낭비.

Netty NIO (Reactor):
  이벤트 루프 스레드 8개
  → 셀렉터(Selector)가 모든 연결의 I/O 이벤트를 감시
  → I/O 준비된 연결만 스레드에 할당
  → 수천 연결을 8개 스레드로 처리

Netty의 NioEventLoopGroup은 내부적으로 Java NIO Selector를 사용해 수만 개의 소켓을 소수의 스레드로 처리한다.


BroadcastServer 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BroadcastServer(private val port: Int) {
    private val log = LoggerFactory.getLogger(javaClass)

    // stockId → 구독 중인 채널 집합
    // "ALL" 키는 모든 틱을 받는 전체 구독자용
    private val subscriptions = ConcurrentHashMap<String, MutableSet<Channel>>()

    fun start() {
        val bossGroup  = NioEventLoopGroup(1)       // accept 전담 스레드 1개
        val workerGroup = NioEventLoopGroup()        // I/O 처리 (기본: CPU코어 × 2)

        val bootstrap = ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel::class.java)
            .childHandler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(ch: SocketChannel) {
                    ch.pipeline()
                        .addLast(HttpServerCodec())           // HTTP 요청 파싱
                        .addLast(HttpObjectAggregator(65536)) // 프레임 집약
                        .addLast(WebSocketServerProtocolHandler("/ws")) // WS 핸드셰이크
                        .addLast(SubscriptionHandler(subscriptions))    // 구독 로직
                }
            })

        val channel = bootstrap.bind(port).sync().channel()
        log.info("Broadcast gateway listening on ws://0.0.0.0:{}/ws", port)
        channel.closeFuture().sync()
    }
}

파이프라인이 핵심이다. 연결이 들어오면 HTTP 업그레이드 요청이 먼저 처리되고(HttpServerCodec, HttpObjectAggregator), WebSocketServerProtocolHandler101 Switching Protocols를 응답해 WebSocket으로 전환한다. 이후 텍스트 프레임이 SubscriptionHandler에 도달한다.


SubscriptionHandler — 구독 관리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class SubscriptionHandler(
    private val subscriptions: ConcurrentHashMap<String, MutableSet<Channel>>,
) : SimpleChannelInboundHandler<TextWebSocketFrame>() {

    private val mapper = ObjectMapper()

    override fun channelRead0(ctx: ChannelHandlerContext, msg: TextWebSocketFrame) {
        runCatching {
            val node   = mapper.readTree(msg.text())
            val action = node["action"]?.asText() ?: return
            val key    = node["stockId"]?.asText() ?: "ALL"

            when (action) {
                "subscribe" ->
                    subscriptions.getOrPut(key) { ConcurrentHashMap.newKeySet() }
                              .add(ctx.channel())
                "unsubscribe" ->
                    subscriptions[key]?.remove(ctx.channel())
                else ->
                    log.debug("알 수 없는 action: {}", action)
            }
        }.onFailure { log.warn("구독 처리 실패: {}", it.message) }
    }

    override fun channelInactive(ctx: ChannelHandlerContext) {
        // 연결 끊김 시 모든 목록에서 제거 → 메모리 누수 방지
        subscriptions.values.forEach { it.remove(ctx.channel()) }
        super.channelInactive(ctx)
    }
}

ConcurrentHashMap.newKeySet()이 보이는 이유가 있다. Java에는 ConcurrentHashSet이 없다. ConcurrentHashMap의 키 집합을 뽑으면 스레드 안전한 Set이 된다.

channelInactive에서 연결이 끊기면 모든 구독 목록에서 채널을 제거한다. 이를 하지 않으면 끊긴 채널에 계속 writeAndFlush를 시도해 에러 로그가 쌓인다.


broadcast() — 핵심 메서드

1
2
3
4
5
fun broadcast(stockId: String, json: String) {
    val makeFrame = { TextWebSocketFrame(json) }
    subscriptions[stockId]?.forEach { it.writeAndFlush(makeFrame()) }
    subscriptions["ALL"]?.forEach  { it.writeAndFlush(makeFrame()) }
}

TextWebSocketFrame(json)을 채널마다 새로 생성하는 이유가 있다. Netty는 버퍼를 reference-counted로 관리한다. 같은 프레임 인스턴스를 두 채널에 보내면 첫 번째 채널이 버퍼를 소유권을 가져가고, 두 번째 채널이 접근할 때는 이미 해제된 메모리를 건드리게 된다. 람다 { TextWebSocketFrame(json) }로 채널마다 독립 인스턴스를 만든다.


KafkaBridge — Kafka 소비 스레드

BroadcastServer는 I/O 이벤트를 기다리는 블로킹 루프다. Kafka 소비도 블로킹이므로 별도 스레드에서 실행한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class KafkaBridge(
    private val brokers: String,
    private val server: BroadcastServer
) {
    private val running = AtomicBoolean(true)

    fun run() {
        val consumer = KafkaConsumer<String, String>(Properties().apply {
            put(BOOTSTRAP_SERVERS_CONFIG, brokers)
            put(GROUP_ID_CONFIG, "broadcast-gateway")
            put(KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer::class.java.name)
            put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
            put(AUTO_OFFSET_RESET_CONFIG, "latest") // 재시작 후 과거 틱 스킵
        })
        consumer.subscribe(listOf("market.ticks", "market.events"))

        while (running.get()) {
            val records = consumer.poll(Duration.ofMillis(200))
            for (record in records) {
                // record.key() = stockId (Go Gateway가 설정한 값)
                server.broadcast(record.key() ?: "ALL", record.value())
            }
        }
        consumer.close()
    }

    fun stop() = running.set(false)
}

AUTO_OFFSET_RESET_CONFIG = "latest"로 설정하는 이유가 있다. 브로드캐스트 게이트웨이는 실시간 중계가 목적이다. 재시작 후 밀린 과거 틱 수만 개를 한꺼번에 클라이언트에 쏘는 것은 의미가 없고 오히려 해롭다. 최신 메시지부터 소비하면 된다.


진입점 (Main.kt)

1
2
3
4
5
6
7
8
9
10
11
12
13
fun main() {
    val port    = System.getenv("BROADCAST_PORT")?.toInt() ?: 9090
    val brokers = System.getenv("KAFKA_BROKERS") ?: "localhost:9092"

    val server = BroadcastServer(port)
    val bridge = KafkaBridge(brokers, server)

    // KafkaBridge는 블로킹 루프 → 별도 스레드
    thread(name = "kafka-bridge") { bridge.run() }

    // BroadcastServer.start() → closeFuture().sync()로 블로킹
    server.start()
}

클라이언트 프로토콜

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 연결
const ws = new WebSocket('ws://localhost:9090/ws');

// 특정 종목 구독
ws.send(JSON.stringify({ action: 'subscribe', stockId: '1' }));

// 전체 종목 구독
ws.send(JSON.stringify({ action: 'subscribe', stockId: 'ALL' }));

// 수신 메시지
ws.onmessage = (event) => {
    const tick = JSON.parse(event.data);
    // { stockId: 1, symbol: "005930", price: 73200.5, ... }
    updateChart(tick);
};

Spring STOMP 대비 스레드 수

 Spring STOMPNetty Broadcast
연결 처리 스레드Tomcat NIO 스레드 풀 (기본 200)boss 1 + worker 16
1만 연결 시 추가 스레드수백0 (이벤트 루프 재사용)
WebSocket 프레임 처리Spring 직렬화 + STOMP 헤더Raw TextWebSocketFrame

단순한 숫자 비교지만, Netty는 “연결이 늘어도 스레드는 늘지 않는다”는 점이 핵심이다.


정리

  • Netty의 NioEventLoopGroup은 소수의 스레드로 수만 연결을 처리한다.
  • ConcurrentHashMap으로 stockId별 구독 채널을 관리하고, 연결 끊김 시 즉시 제거한다.
  • TextWebSocketFrame은 채널마다 새 인스턴스를 생성해야 한다 (reference-counted 버퍼).
  • KafkaBridge의 AUTO_OFFSET_RESET=latest로 재시작 후 과거 틱 재처리를 방지한다.

다음 편에서는 시세 파이프라인의 마지막 단계인 EMA 기반 이상 탐지 — 가격 급등과 거래량 서지를 어떻게 실시간으로 감지하는지 다룬다.

  1. 1 가격이 아니라 이벤트를 팔자 — monticker 설계 철학
  2. 2 모듈식 모놀리스를 선택한 이유 — MSA의 유혹을 거부하기
  3. 3 TimescaleDB를 시계열 DB로 고른 이유 — Hypertable과 연속 집계
  4. 4 Go goroutine으로 202개 종목 동시 수집하기 — Market Gateway 설계
  5. 5 Kafka로 시세 파이프라인 분리하기 — 토픽 설계와 at-least-once
  6. 6 Netty로 수만 연결에 시세 브로드캐스트하기 — NioEventLoopGroup 리액터 패턴
  7. 7 EMA 기반 이상 탐지 — 가격 급등과 거래량 서지 실시간 감지
  8. 8 TreeMap으로 CLOB 호가창 구현하기 — 가격/시간 우선 매칭과 슬리피지
  9. 9 주문 전 동기 리스크 게이트 설계 — VaR, 집중도, 일일손실 5가지 규칙
  10. 10 잔고를 저장하지 말고 재구성하라 — 이벤트 소싱 원장 설계
  11. 11 감정 태그 × 수익률 — 투자 습관을 데이터로 기록하기
  12. 12 룰 엔진: RSI·MACD 조건식을 JSON DSL로 — Quant Lab 설계
  13. 13 백테스트 엔진: look-ahead 없는 시뮬레이션 — Sharpe·MDD·PF 계산
  14. 14 전략 지문(SHA-256)으로 룰셋 보호하기 — 서버 사이드 실행과 역공학 방어
  15. 15 Markowitz 최적화를 솔버 없이 구현하기 — 프로젝션 경사하강법
  16. 16 Kelly Criterion: 수학적 파산 방지 베팅 비율 — Half Kelly와 백테스트 연동
  17. 17 ZigZag + 패턴 템플릿 매칭으로 차트 패턴 감지 — 헤드앤숄더·이중바닥
  18. 18 ADX로 시장 국면 분류하기 — BULL·BEAR·SIDEWAYS·HIGH_VOL
  19. 19 손익통산으로 세금 줄이기 — Tax-Loss Harvesting 시뮬레이션
  20. 20 MockK로 JdbcTemplate 목킹하기 — 311개 테스트 작성 경험
  21. 21 OpenTelemetry + Jaeger로 분산 추적 — 시세 파이프라인 지연 측정
  22. 22 Circuit Breaker로 외부 API 장애 격리 — Resilience4j + KIS·Yahoo 폴백 체인
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.

댓글

아직 댓글이 없습니다