본문 바로가기
SingleStoreDB/Support Bulletin

[Support Bulletin 03] - Kafka Pipeline 의 zstd 압축 포맷 지원

by 에이플랫폼 [Team SingleStore Korea] 2025. 2. 4.

 

안녕하세요 에이플랫폼의 Support Bulletin 시리즈 입니다.

벌써 세 번째 시간인데요 오늘은 조금은 색다르게 해볼 예정 입니다.

앞서 올려드린 Kafka 클러스터와 SingleStore 클러스터 구축 글을 통해

직접 따라서 해보실 수 있게끔 준비했습니다.

그럼 SingleStore의 기능인 Kafka Pipeline에 대해서 설명 드리겠습니다.


📌개요

실시간 데이터 스트리밍 플랫폼인 Kafka 는 Topic 에 전달하는 메시지 포맷으로 비압축 일반 데이터를 포함하여 gzip, lz4, snappy, zstd 등 다양한 알고리즘에 의한 압축 형태를 지원합니다.

Kafka 토픽에 전달되는 메시지를 실시간으로 테이블에 적재하는 기능인 SingleStore Kafka Pipeline 에서도 별도의 설정 없이 위의 압축 포맷을 지원하고 있습니다.

하지만 기존 버전의 SingleStore Kafka Pipelinegzip, lz4, snappy 포맷은 정상적으로 지원하지만 zstd 방식을 지원하고 있지 않아 다음과 같은 에러가 발생할 수 있었습니다.

Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 10 of 129 bytes failed: Local: Not implemented

SingleStore v8.9.1 부터 Kafka Pipeline 에서 zstd 압축 포맷을 정상적으로 지원하여 Kafka Pipeline 의 활용성이 높아졌습니다.


💡싱글스토어 클러스터 구축

싱글스토어 클러스터는 구축이 된 상황을 가정 하고 설명 하겠습니다.

아래의 링크를 통해 간단히 클러스터 구축 방법을 따로 설명하겠습니다.

2025.02.04 - [SingleStoreDB/엔지니어링] - SingleStore 클러스터 구축

 

SingleStore 클러스터 구축

안녕하세요 에이플랫폼 입니다.오늘은 후술할 Kafka 관련 테스트를 하기 위해 싱글스토 클러스터를 구축하는 방법을 소개해 드리겠습니다. 📌테스트 환경Virtual Machine을 사용했습니다.OS: Rocky 9.5

a-platform.tistory.com


💡카프카 클러스터 구축

먼저 Kafka Pipeline을 사용해보려면 Kafka 클러스터를 구축 해야겠죠?

아래의 링크를 통해 카프카 싱글 클러스터 구축 하는 방법을 소개해 드리겠습니다.

2025.02.03 - [SingleStoreDB/엔지니어링] - 카프카(kafka) 싱글 노드 클러스터 구축

 

 

카프카(kafka) 싱글 노드 클러스터 구축

안녕하세요 에이플랫폼 입니다.오늘은 후술할 Kafka 관련 테스트를 하기 위해 카프카 싱글 노드 클러스터를 구축하는 방법을 소개해 드리겠습니다. 📌테스트 환경Virtual Machine을 사용했습니다.OS

a-platform.tistory.com


✅예제

Kafka Topic 생성

[jwy@kafka ~]$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic comp-data-zstd --config retention.ms=600000

Created topic comp-data-zstd.

zstd 포맷으로 메시지를 압축하여 Kafka 토픽에 전달

[jwy@kafka ~]$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic comp-data-zstd --compression-codec zstd

# 실행시 모습
[jwy@kafka ~]$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic comp-data-zstd --compression-codec zstd
>

# 프로듀서에 메시지 생성
[jwy@kafka ~]$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic comp-data-zstd --compression-codec zstd
>celadon
fuchsia
charcoal
aquamarine
crimson
ivory
coral
ultramarine
green
orange>>>>>>>>>

SingleStore 테이블 및 파이프라인 생성 후 실행

singlestore> create database test;
Query OK, 1 row affected (3.59 sec)

singlestore> use test;
Database changed

singlestore> CREATE TABLE colors_zstd(name TEXT);
Query OK, 0 rows affected (0.84 sec)

singlestore> CREATE PIPELINE pipe_colors_zstd
    -> AS LOAD DATA KAFKA 'localhost:9092/comp-data-zstd'
    -> INTO TABLE colors_zstd;
Query OK, 0 rows affected (0.19 sec)

singlestore> START PIPELINE pipe_colors_zstd;
Query OK, 0 rows affected (0.01 sec)

데이터 적재 후 SingleStore 테이블 조회

singlestore> SELECT * FROM colors_zstd;
+-------------+
| name        |
+-------------+
| coral       |
| charcoal    |
| ivory       |
| green       |
| celadon     |
| orange      |
| fuchsia     |
| aquamarine  |
| crimson     |
| ultramarine |
+-------------+
10 rows in set (0.03 sec)

위와 같이 조회시 정상적으로 테이블에 적재가 된 모습을 볼 수 있습니다.


✅예제2 8.9.1 이전 버전 실행

SingleStore 클러스터 삭제 후 이전 버전 설치

[jwy@kafka ~]$ sdb-deploy destroy-cluster -y

<생략>
Operation completed successfully

# setup.yaml 파일 수정
memsql_server_version: 8.9 -> memsql_server_version: 8.7 

# 클러스터 재설치
[jwy@kafka ~]$ sdb-deploy setup-cluster --cluster-file setup.yaml -y
The Final Cluster State
Hosts
+--------------+------------+-------------+---------------+
|     Host     | Local Host | SSH address | Identity File |
+--------------+------------+-------------+---------------+
| {ip address} | Yes        |             |               |
+--------------+------------+-------------+---------------+
Nodes
+------------+--------+--------------+------+---------------+--------------+---------+----------------+--------------------+--------------+
| MemSQL ID  |  Role  |     Host     | Port | Process State | Connectable? | Version | Recovery State | Availability Group | Bind Address |
+------------+--------+--------------+------+---------------+--------------+---------+----------------+--------------------+--------------+
| 41A184C4F1 | Master | {ip address}  | 3306 | Running       | True         | 8.7.23  | Online         |                    | 0.0.0.0      |
| 6BBCDE4014 | Leaf   | {ip address}  | 3307 | Running       | True         | 8.7.23  | Online         | 1                  | 0.0.0.0      |
+------------+--------+--------------+------+---------------+--------------+---------+----------------+--------------------+--------------+

Kafka Topic 생성

[jwy@kafka ~]$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic comp-data-zstd --config retention.ms=600000

Created topic comp-data-zstd.

zstd 포맷으로 메시지를 압축하여 Kafka 토픽에 전달

[jwy@kafka ~]$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic comp-data-zstd --compression-codec zstd

# 실행시 모습
[jwy@kafka ~]$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic comp-data-zstd --compression-codec zstd
>

# 프로듀서에 메시지 생성
[jwy@kafka ~]$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic comp-data-zstd --compression-codec zstd
>celadon
fuchsia
charcoal
aquamarine
crimson
ivory
coral
ultramarine
green
orange>>>>>>>>>

SingleStore 테이블 및 파이프라인 생성 후 실행

singlestore> create database test;
Query OK, 1 row affected (3.59 sec)

singlestore> use test;
Database changed

singlestore> CREATE TABLE colors_zstd(name TEXT);
Query OK, 0 rows affected (0.84 sec)

singlestore> CREATE PIPELINE pipe_colors_zstd
    -> AS LOAD DATA KAFKA 'localhost:9092/comp-data-zstd'
    -> INTO TABLE colors_zstd;
Query OK, 0 rows affected (0.19 sec)

singlestore> START PIPELINE pipe_colors_zstd;
Query OK, 0 rows affected (0.01 sec)

데이터 적재 후 SingleStore 테이블 조회

singlestore> SELECT * FROM colors_zstd;
Empty set (0.00 sec)

파이프라인 에러 조회

singlestore> SELECT database_name, PIPELINE_NAME, ERROR_TYPE, ERROR_CODE, ERROR_MESSAGE FROM information_schema.PIPELINES_ERRORS;

|DATABASE_NAME|PIPELINE_NAME   |ERROR_CODE|ERROR_MESSAGE                                                                                                                                                                                                                          |
|-------------|----------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|test         |pipe_colors_zstd|1,934     |Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                                                |
|test         |pipe_colors_zstd|1,934     |Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                                                |
|test         |pipe_colors_zstd|1,934     |Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                                                |
|test         |pipe_colors_zstd|1,934     |Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                                                |
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented|
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                |
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented|
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                |
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented|
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                |
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented|
|test         |pipe_colors_zstd|1,934     |Leaf Error ({ip.address}:3307): Cannot extract data for pipeline.  Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented                                |

- Leaf Error ({ip.address}:3307): Leaf Error ({ip.address}:3307): Cannot extract data for pipeline. Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented

- Leaf Error ({ip.address}:3307): Cannot extract data for pipeline. Failed to stream message from Kafka with error Decompression (codec 0x4) of message at 0 of 137 bytes failed: Local: Not implemented

위와 같은 이유로 실패하게 됩니다.


 

이번 포스팅에서는 SingleStore에서 Kafka Pipeline을 설정하고 사용하는 방법에 대해 살펴보았습니다.

SingleStore는 대규모 데이터를 실시간으로 처리하는 데 최적화된 분산형 데이터베이스로, Kafka와 결합하면 실시간 데이터 스트리밍을 효율적으로 처리하고 빠르게 테이블에 적재할 수 있습니다.

에이플랫폼에서 SingleStore Kafka Pipeline의 강력한 성능을 활용해 데이터 흐름을 최적화하고, 비즈니스 성과를 더욱 높여보세요.