포스트

Go goroutine으로 202개 종목 동시 수집하기 — Market Gateway 설계

왜 Go인가

시세 수집기를 만들 때 가장 먼저 마주하는 문제는 동시성이다. 202개 종목의 가격 변화를 1초마다 처리하려면, 202개의 독립적인 루프가 동시에 돌아야 한다.

JVM(Kotlin/Spring) 위에서도 가능하지만, Go의 goroutine을 사용하면 비용이 훨씬 낮다.

동시성 단위시작 스택 크기1만 개 생성 시 메모리
OS Thread~1-8 MB~10 GB
JVM Thread~512 KB~1 MB~5 GB
Kotlin Coroutine~수 KB수십 MB
Go Goroutine~2 KB~20 MB

202개 종목 정도는 어떤 방식이든 처리할 수 있지만, 실제 거래소는 수천 개 종목을 다룬다. goroutine은 그 규모에서도 메모리 부담이 없다.


전체 구조

1
2
3
4
5
6
7
8
9
services/market-gateway/
├── main.go
├── go.mod
├── Dockerfile
└── internal/
    ├── stock/loader.go       # DB에서 활성 종목 로드
    ├── generator/generator.go # goroutine-per-stock 틱 루프
    ├── kafkaproducer/producer.go # Kafka 발행
    └── tick/tick.go          # 틱 메시지 구조체

의존성은 두 개뿐이다.

1
2
github.com/jackc/pgx/v5       # PostgreSQL 드라이버
github.com/segmentio/kafka-go  # Kafka 클라이언트

Spring 없이, CGo 없이. 최종 바이너리 크기는 약 12MB, Docker 이미지는 alpine 기반 약 20MB다.


진입점: main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
    brokers := getenv("KAFKA_BROKERS", "localhost:9092")
    dbURL   := getenv("DB_URL", "postgres://monticker:monticker@localhost:5432/monticker?sslmode=disable")

    // SIGINT/SIGTERM 수신 시 ctx 취소 → 모든 goroutine 정상 종료
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer stop()

    pool, err := pgxpool.New(ctx, dbURL)
    if err != nil {
        log.Fatalf("DB 연결 실패: %v", err)
    }
    defer pool.Close()

    stocks, err := stock.LoadActiveStocks(ctx, pool)
    if err != nil {
        log.Fatalf("종목 로드 실패: %v", err)
    }
    log.Printf("loaded %d active stocks", len(stocks))

    producer := kafkaproducer.New(brokers)
    defer producer.Close()

    generator.Run(ctx, stocks, producer, 1*time.Second)
}

signal.NotifyContext는 Go 1.16부터 표준 라이브러리에 포함된 패턴이다. SIGINT(Ctrl+C)나 SIGTERM을 받으면 ctx가 취소되고, 이를 select로 감지하는 모든 goroutine이 정상 종료된다.


종목 로드: stock/loader.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Stock struct {
    ID        int64
    Symbol    string
    Market    string
    BasePrice float64
}

func LoadActiveStocks(ctx context.Context, pool *pgxpool.Pool) ([]Stock, error) {
    rows, err := pool.Query(ctx, `
        SELECT id, symbol, market, COALESCE(base_price, 10000.0) as base_price
        FROM stocks
        WHERE is_active = true
        ORDER BY id
    `)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var stocks []Stock
    for rows.Next() {
        var s Stock
        if err := rows.Scan(&s.ID, &s.Symbol, &s.Market, &s.BasePrice); err != nil {
            return nil, err
        }
        stocks = append(stocks, s)
    }
    return stocks, rows.Err()
}

DB 연결은 애플리케이션 시작 시 1회만 수행한다. 이후 goroutine들은 DB에 접근하지 않으므로 커넥션 풀 부하가 없다.


핵심: goroutine-per-stock 틱 루프

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func Run(ctx context.Context, stocks []stock.Stock, pub Publisher, interval time.Duration) {
    var wg sync.WaitGroup
    for _, s := range stocks {
        wg.Add(1)
        go func(s stock.Stock) {
            defer wg.Done()
            tickLoop(ctx, s, pub, interval)
        }(s)
    }
    // ctx 취소 시 모든 goroutine이 tickLoop에서 return하면 wg.Wait() 통과
    wg.Wait()
}

func tickLoop(ctx context.Context, s stock.Stock, pub Publisher, interval time.Duration) {
    price := s.BasePrice
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return  // graceful shutdown
        case <-ticker.C:
            price = nextPrice(price)
            t := tick.Tick{
                StockID:     s.ID,
                Symbol:      s.Symbol,
                Market:      s.Market,
                Price:       math.Round(price*100) / 100,
                Volume:      rand.Int63n(100000) + 1000,
                TradeTime:   time.Now().UTC(),
                GeneratedAt: time.Now().UTC(),
            }
            pub.Publish(ctx, strconv.FormatInt(s.ID, 10), t)
        }
    }
}

goroutine 클로저에서 루프 변수를 직접 캡처하면 모든 goroutine이 마지막 값을 공유하는 버그가 생긴다. Go 1.22부터 이 문제가 수정됐지만, 명시적으로 s stock.Stock을 파라미터로 넘기는 방어 코딩이 여전히 권장된다.


랜덤워크 가격

현재는 가격을 랜덤워크로 생성한다. 나중에 KIS WebSocket을 연동하면 이 부분만 교체하면 된다.

1
2
3
4
5
6
func nextPrice(current float64) float64 {
    // ±0.1% 범위 내 랜덤 변동
    change := (rand.Float64() - 0.5) * 0.002
    next := current * (1 + change)
    return math.Max(next, 1.0) // 0원 미만 방지
}

주가가 양수를 유지하도록 math.Max(next, 1.0)로 보호한다.


Kafka 발행: kafkaproducer/producer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const TicksTopic = "market.ticks"

type Producer struct {
    writer *kafka.Writer
}

func New(brokers string) *Producer {
    return &Producer{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(strings.Split(brokers, ",")...),
            Topic:        TicksTopic,
            Balancer:     &kafka.Hash{},       // 핵심: 키 해시 기반 파티션 선택
            RequiredAcks: kafka.RequireOne,    // leader ack만 기다림
        },
    }
}

func (p *Producer) Publish(ctx context.Context, key string, t tick.Tick) {
    payload, err := json.Marshal(t)
    if err != nil {
        log.Printf("직렬화 실패: %v", err)
        return
    }
    if err := p.writer.WriteMessages(ctx, kafka.Message{
        Key:   []byte(key),   // key = stockId
        Value: payload,
    }); err != nil {
        log.Printf("Kafka 발행 실패 [%s]: %v", key, err)
    }
}

kafka.Hash{} 밸런서가 핵심이다. key(stockId)를 해싱해 같은 종목의 틱이 항상 같은 파티션으로 간다. 파티션 내에서 메시지 순서가 보장되므로, 컨슈머가 캔들 집계를 할 때 시간 역전 오류가 없다.


틱 메시지 형식

1
2
3
4
5
6
7
8
9
type Tick struct {
    StockID     int64     `json:"stockId"`
    Symbol      string    `json:"symbol"`
    Market      string    `json:"market"`
    Price       float64   `json:"price"`
    Volume      int64     `json:"volume"`
    TradeTime   time.Time `json:"tradeTime"`
    GeneratedAt time.Time `json:"generatedAt"`
}

GeneratedAt은 나중에 지연(latency) 추적에 사용된다. 컨슈머에서 time.Now() - GeneratedAt을 계산하면 Kafka 파이프라인의 엔드투엔드 지연을 측정할 수 있다.


실행 확인

202개 종목이 로드되고 틱이 발행되는지 확인한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 환경변수 설정
export KAFKA_BROKERS=localhost:29092
export DB_URL="postgres://monticker:monticker@localhost:5432/monticker?sslmode=disable"

# 실행
cd services/market-gateway
go run .

# 출력
# 2025/07/01 10:00:00 loaded 202 active stocks
# 2025/07/01 10:00:01 [005930] published tick: 73200.00
# 2025/07/01 10:00:01 [AAPL] published tick: 218.50
# ...

# Kafka 토픽 확인
kafka-console-consumer \
  --bootstrap-server localhost:29092 \
  --topic market.ticks \
  --from-beginning \
  --max-messages 3
1
{"stockId":1,"symbol":"005930","market":"KOSPI","price":73200.5,"volume":45231,"tradeTime":"2025-07-01T10:00:01Z","generatedAt":"2025-07-01T10:00:01Z"}

KIS WebSocket 연동 경로

현재 tickLoopnextPrice() 호출을 KIS WebSocket 수신 채널로 교체하면 실시간 시세로 전환된다.

1
2
3
4
5
6
// 현재 (mock)
price = nextPrice(price)

// KIS 연동 후
msg := <-kisMessageChannel  // KIS H0STASP0 호가 메시지
price = parseKisPrice(msg)

goroutine-per-stock 구조는 KIS의 종목별 WebSocket 구독과 자연스럽게 대응된다.


정리

  • Go goroutine은 2KB 시작 스택으로 202개(~수천 개)를 동시에 실행해도 메모리 부담이 없다.
  • kafka.Hash{} 밸런서로 같은 종목의 틱이 같은 파티션에 순서대로 쌓인다.
  • signal.NotifyContext로 Ctrl+C 시 모든 goroutine을 정상 종료한다.

다음 편에서는 이 틱이 Kafka 토픽을 거쳐 Kotlin Worker에서 어떻게 소비되는지, 토픽 설계와 at-least-once 처리를 다룬다.

  1. 1 가격이 아니라 이벤트를 팔자 — monticker 설계 철학
  2. 2 모듈식 모놀리스를 선택한 이유 — MSA의 유혹을 거부하기
  3. 3 TimescaleDB를 시계열 DB로 고른 이유 — Hypertable과 연속 집계
  4. 4 Go goroutine으로 202개 종목 동시 수집하기 — Market Gateway 설계
  5. 5 Kafka로 시세 파이프라인 분리하기 — 토픽 설계와 at-least-once
  6. 6 Netty로 수만 연결에 시세 브로드캐스트하기 — NioEventLoopGroup 리액터 패턴
  7. 7 EMA 기반 이상 탐지 — 가격 급등과 거래량 서지 실시간 감지
  8. 8 TreeMap으로 CLOB 호가창 구현하기 — 가격/시간 우선 매칭과 슬리피지
  9. 9 주문 전 동기 리스크 게이트 설계 — VaR, 집중도, 일일손실 5가지 규칙
  10. 10 잔고를 저장하지 말고 재구성하라 — 이벤트 소싱 원장 설계
  11. 11 감정 태그 × 수익률 — 투자 습관을 데이터로 기록하기
  12. 12 룰 엔진: RSI·MACD 조건식을 JSON DSL로 — Quant Lab 설계
  13. 13 백테스트 엔진: look-ahead 없는 시뮬레이션 — Sharpe·MDD·PF 계산
  14. 14 전략 지문(SHA-256)으로 룰셋 보호하기 — 서버 사이드 실행과 역공학 방어
  15. 15 Markowitz 최적화를 솔버 없이 구현하기 — 프로젝션 경사하강법
  16. 16 Kelly Criterion: 수학적 파산 방지 베팅 비율 — Half Kelly와 백테스트 연동
  17. 17 ZigZag + 패턴 템플릿 매칭으로 차트 패턴 감지 — 헤드앤숄더·이중바닥
  18. 18 ADX로 시장 국면 분류하기 — BULL·BEAR·SIDEWAYS·HIGH_VOL
  19. 19 손익통산으로 세금 줄이기 — Tax-Loss Harvesting 시뮬레이션
  20. 20 MockK로 JdbcTemplate 목킹하기 — 311개 테스트 작성 경험
  21. 21 OpenTelemetry + Jaeger로 분산 추적 — 시세 파이프라인 지연 측정
  22. 22 Circuit Breaker로 외부 API 장애 격리 — Resilience4j + KIS·Yahoo 폴백 체인
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.

댓글

아직 댓글이 없습니다