본문 바로가기
SingleStoreDB/엔지니어링

SingleStore - Apache Flink를 사용한 실시간 스트리밍 파이프라인 구축

by 에이플랫폼 [Team SingleStore Korea] 2025. 4. 7.

 

안녕하세요, 에이플랫폼입니다.

100ms의 지연 시간마다 매출이 1% 감소한다.
- Amazon -
 

Amazon found every 100ms of latency cost them 1% in sales.

10 years ago, Amazon found that every 100ms of latency cost them 1% in sales. Google found an extra .5 seconds in search page generation time dropped traffic by 20%. A broker could lose $4 million in revenues per millisecond if their electronic trading pla

www.gigaspaces.com

 

실시간 데이터 처리의 중요성에 대해 들어보셨나요?

오늘날 기업들은 지연 시간이 단 100ms 증가할 때마다 매출이 1% 감소한다는 사실을 알고 있습니다.

이처럼 실시간 데이터 처리는 비즈니스 성공의 핵심 요소로 자리 잡고 있습니다.

이 글에서는 싱글스토어(SingleStore), 아파치 카프카(Apache Kafka), 아파치 플링크(Apache Flink)를 활용하여 효율적인 실시간 데이터 처리 파이프라인을 구축하는 방법에 대해 소개하겠습니다.

  • 싱글스토어: 실시간 저장 및 분석이 가능한 고성능 관계형 분산 데이터베이스
  • 카프카: 데이터 스트림 수집의 중추 역할을 하며, 높은 처리량과 낮은 지연 시간
  • 플링크: Stateful 스트림 처리를 지원하여, 이벤트 간의 관계를 분석하고 패턴을 탐지하는 데 유용

본 글에서는 도커(Docker)를 활용해 이 세 기술을 연동하는 방법을 단계별로 살펴보겠습니다.

 

데모 환경

VM

  • OS: Rocky 9.5
  • vCPU: 4
  • RAM: 8 GB
  • Disk: 50 GB

폴더 구조

├── kafka-producer/         # Maven project for the Kafka Producer
│   └── Dockerfile
├── flink-processor/        # Maven project for the Flink Processor
│   └── Dockerfile
├── docker-compose.yml      # Docker Compose setup for Kafka, 
Zookeeper, producer, and processor
└── README.md               # Project documentation

필수 구성 요소 및 설치 방법

  • Java
  • Maven
  • Docker
  • Docker Compose
# Java 설치
sudo dnf install java-11-openjdk -y

# 설치 확인
jave -version
>>>
openjdk version "11.0.25" 2024-10-15 LTS
OpenJDK Runtime Environment (Red_Hat-11.0.25.0.9-1) (build 11.0.25+9-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-11.0.25.0.9-1) (build 11.0.25+9-LTS, mixed mode, sharing)
# Maven 설치
sudo dnf install maven -y 

# 설치 확인
mvn --version
>>>
Apache Maven 3.6.3 (Red Hat 3.6.3-19)
Maven home: /usr/share/maven
Java version: 17.0.14, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-17-openjdk-17.0.14.0.7-2.el9.x86_64
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.14.0-503.14.1.el9_5.x86_64", arch: "amd64", family: "unix"
# Docker 설치시 RHEL 계열을 자동으로 podman이 설치 됩니다.
sudo dnf install docker -y

# podman으로 하면 아래의 명령어를 사용할 필요 없습니다.
# 이 문서도 podman으로 진행하였습니다.
sudo systemctl enable docker
sudo systemctl start docker
sudo usermod -a -G docker {user_name}

# 설치 확인
podman --version
>>>
podman version 5.2.2

# Docker Compose 설치
# podman compose를 설치하려 합니다.
pip3 install podman-compose
# pip3가 안된다면 파이썬과 pip 모듈 설치 후 사용가능
sudo dnf install python -y 
sudo dnf install python3-pip
 

# 꼭 도커로 설치를 해야겠다 하면
# 1. Podman 패키지 제거
sudo dnf remove podman -y
# 2. Docker 공식 리포지토리를 추가합니다
sudo dnf config-manager --add-repo=https://download.docker.com/linux/centos/docker-ce.repo
# 3. Docker 설치
sudo dnf install docker-ce docker-ce-cli containerd.io -y
# 4. Docker 서비스 시작 및 활성화
sudo systemctl start docker
sudo systemctl enable docker
# 5. DOcker 그룹에 유저 등록
sudo usermod -a -G docker {user_name}

# 세션 재접속
id
# 유저가 docker 그룹에 추가 되어있는지 확인
# git 설치
sudo yum install -y git 

# 설치 확인
git --version
>>>
git version 2.43.5

# git 클론
git clone https://github.com/singlestore-labs/singlestore-flink-integration.git
>>>
'singlestore-flink-integration' 이라는 디렉토리가 생성 됩니다.
cd singlestore-flink-integration
 

데이터베이스 설정

Flink 프로세서는 JDBC를 사용하여 처리된 데이터를 SingleStore 데이터베이스에 저장합니다.

Flink 프로세서의 코드(flink-processor/src/main/java/Main.java)에서 데이터베이스 연결 정보를 수정합니다.

# 테이블 생성
"
SingleStore 클러스터는 이미 구축이 되어있다고 가정하고 합니다.
SingleStore 클러스터에서 데이터 베이스를 생성 후 테이블을 생성합니다.
"
CREATE TABLE `stockTrans` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `stock` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `qty` int(11) DEFAULT NULL,
  `avgValue` double DEFAULT NULL,
  PRIMARY KEY (`id`)
);

# Main.java 속 DB 연결 정보 편집
vi flink-processor/src/main/java/Main.java
# 아래의 부분만 수정 합니다.
"
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withUrl("jdbc:singlestore://<<hostname>>:<<port>>/<<database>>")
              .withDriverName("com.singlestore.jdbc.Driver")
              .withUsername("<<username>>")
              .withPassword("<<password>>")
              .build()

"

# Flink 프로세서 디렉토리 속 wait-for-it.sh 실행권한 부여
chmod +x wait-for-it.sh

# kafka-producer 디렉토리의 wait-for-it.sh에도 실행 권한 부여
cd kafka-producer

# KAfka 프로듀서 디렉토리 속 wait-for-it.sh 실행권한 부여
chmod +x wait-for-it.sh

Kafka 프로듀서

# 카프카 프로듀서 디렉토리로 이동
cd kafka-producer/

# 메이븐 프로젝트 패키지화
mvn clean package

# 카프카 프로듀서의 도커 이미지 생성
podman build -t kafka-producer .

Flink 프로세서

# 플링크 프로세서 디렉토리로 이동
cd flink-processor/

# 메이븐 프로젝트 패키지화
mvn clean package

# 카프카 프로듀서의 도커 이미지 생성
docker build -t flink-processor .

프로젝트 실행

# 프로젝트 루트 디렉토리로 이동
cd singlestore-flink-integration/

# 프로젝트 실행
podman compose up

프로젝트가 실행되면 offset이 증가하는 로그가 보입니다.

이때 테이블에 어떤 데이터가 삽입되는지 확인하면 아래와 같이 데이터가 삽입된 모습을 보실 수 있습니다.

# Table에 데이터가 삽입된 모습
select * from test.stockTrans order by id;
>>>
|id |stock                      |qty|avgValue    |
|---|---------------------------|---|------------|
|25 |Olson Group                |179|0.0228780737|
|26 |Bradtke and Sons           |211|0.2041086513|
|27 |Emard and Sons             |131|0.4036664829|
|28 |Murazik LLC                |217|0.3465456156|
|29 |Smith, Batz and Beahan     |83 |1.0347236467|
|30 |Brown-Christiansen         |61 |0.7195127218|
|31 |Bauch, Waelchi and Schuster|117|0.2777100669|
 

이번 프로젝트는 SingleStore와 Kafka, Flink를 활용하여 데이터 스트림 처리와 실시간 분석의 가능성을 보여줍니다. 이러한 기술들을 통해 복잡한 데이터 환경에서도 효율적이고 신뢰성 높은 솔루션을 구축할 수 있습니다. 앞으로 이 프로젝트가 여러분의 데이터 처리 여정에 도움이 되길 바랍니다!