ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

如何通过 Apache Kafka 和 ScyllaDB 使用变更数据捕获

2023-03-03 16:53:26  阅读:710  来源: 互联网

标签:ScyllaDB 据库表 源连接器 数据


在 ScyllaDB 大学的这个动手实验室中,您将学习如何使用ScyllaDB CDC 源连接器将 ScyllaDB 集群表中的行级更改事件推送到 Kafka 服务器。

什么是 ScyllaDB CDC?

回顾一下,更改数据捕获 (CDC) 是一项功能,它不仅允许您查询数据库表的当前状态,还可以查询对表所做的所有更改的历史记录。从 ScyllaDB Enterprise 2021.1.1 和 ScyllaDB Open Source 4.3 开始,CDC 已准备好生产 (GA)。

在 ScyllaDB 中,CDC 是可选的,并且在每个表的基础上启用。对启用 CDC 的表所做的更改历史存储在单独的关联表中。

您可以在使用 CDC 选项创建或更改表时启用 CDC,例如:

CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};

ScyllaDB CDC 源连接器

ScyllaDB CDC Source Connector 是一个源连接器,用于捕获 ScyllaDB 集群表中的行级更改。它是一个Debezium连接器,兼容 Kafka Connect(Kafka 2.6.0+)。连接器读取指定表的 CDC 日志,并为每个行级INSERTUPDATEDELETE操作生成 Kafka 消息。连接器是容错的,在发生故障时重试从 Scylla 读取数据。它使用 Kafka Connect 偏移跟踪定期将当前位置保存在 ScyllaDB CDC 日志中。每个生成的 Kafka 消息都包含有关源的信息,例如时间戳和表名。

注意:在撰写本文时,不支持集合类型 (LIST,SET,MAP) 和 UDT——生成的消息中省略了具有这些类型的列。随时了解此增强请求和GitHub 项目中的其他开发情况。

Confluent 和 Kafka Connect

Confluent 是一个全面的数据流平台,使您能够轻松访问、存储和管理连续、实时的数据流。它通过企业级功能扩展了 Apache Kafka 的优势。Confluent 可以轻松构建现代的、事件驱动的应用程序,并获得通用数据管道,支持可扩展性、性能和可靠性。

Kafka Connect 是一种用于在 Apache Kafka 和其他数据系统之间可扩展且可靠地传输数据的工具。它使定义将大型数据集移入和移出 Kafka 的连接器变得简单。它可以提取整个数据库或从应用程序服务器收集指标到 Kafka 主题中,使数据可用于低延迟的流处理。

Kafka Connect 包括两种类型的连接器:

  1. 源连接器:源连接器接收整个数据库并将表更新流式传输到 Kafka 主题。源连接器还可以从应用程序服务器收集指标并将数据存储在 Kafka 主题中,使数据可用于低延迟的流处理。
  2. Sink 连接器: Sink 连接器将 Kafka 主题中的数据传输到二级索引(如 Elasticsearch)或批处理系统(如 Hadoop)以进行离线分析。

使用 Docker 设置服务

在本实验中,您将使用 Docker。

请确保您的环境满足以下先决条件:

  • 适用于 Linux、Mac 或 Windows 的 Docker。
    • 注意: 仅建议在 Docker 中运行 ScyllaDB 以评估和试用 ScyllaDB。
  • ScyllaDB 开源。为获得最佳性能,建议定期安装。
  • 8 GB RAM 或更大用于 Kafka 和 ScyllaDB 服务。
  • docker-compose
  • 混帐

ScyllaDB 安装和初始化表

首先,您将启动一个三节点 ScyllaDB 集群并创建一个启用了 CDC 的表。

如果您还没有这样做,请从 git 下载示例:

git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab

 

这是您将使用的 docker-compose 文件。它启动了一个三节点的 ScyllaDB 集群:

version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

 

启动 ScyllaDB 集群:

docker-compose -f docker-compose-scylladb.yml up -d

 

等待一分钟左右,查看ScyllaDB集群是否启动正常:

docker exec scylla-node1 nodetool status

 

接下来,您将使用 cqlsh 与 ScyllaDB 进行交互。创建一个键空间和一个启用了 CDC 的表,并向表中插入一行:

docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit

 

[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns    Host ID                               Rack
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 

 

Confluent 设置和连接器配置

要启动 Kafka 服务器,您将使用 Confluent 平台,它提供了一个用户友好的 Web GUI 来跟踪主题和消息。融合平台提供了一个docker-compose.yml文件来设置服务。

注意:这不是您在生产中使用 Apache Kafka 的方式。该示例仅用于培训和开发目的。获取文件:

wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml

 

接下来,下载 ScyllaDB CDC 连接器:

wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar

 

通过编辑docker-compose-confluent.yml 添加如下两行,使用 Docker 卷将 ScyllaDB CDC 连接器添加到 Confluent 连接服务插件目录,将目录替换为文件目录scylla-cdc-plugin.jar

 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry

 

启动 Confluent 服务:

docker-compose -f docker-compose-confluent.yml up -d

 

等待一分钟左右,然后访问http://localhost:9021Confluent Web GUI。

使用 Confluent 仪表板添加 ScyllaConnector:

融合仪表板

通过单击插件添加 Scylla 连接器:

插入

用其中一个 Scylla 节点的 IP 地址(您可以在 nodetool status 命令的输出中看到它)和 ScyllaDB 服务侦听的端口 9042 填充“主机”。

“命名空间”是您之前在 ScyllaDB 中创建的键空间。

请注意,可能需要一分钟左右的时间才会ks.my_table出现:

表格1

 

表 2

测试 Kafka 消息

您可以看到这MyScyllaCluster.ks.my_table是 ScyllaDB CDC 连接器创建的主题。

现在,从主题面板检查 Kafka 消息:

控制板

选择主题,与您在 ScyllaDB 中创建的键空间和表名相同:

表3

在“概览”选项卡中,您可以看到主题信息。在底部,它显示此主题位于分区 0 上。

分区是最小的存储单元,它包含主题拥有的记录子集。每个分区都是一个日志文件,记录以仅追加方式写入其中。分区中的每个记录都分配了一个称为偏移量的顺序标识符,它对于分区中的每个记录都是唯一的。偏移量是由 Kafka 维护的增量且不可变的数字。

如您所知,ScyllaDB CDC 消息被发送到主题ks.my_table,主题的分区 ID 为 0。接下来,转到“消息”选项卡并将分区 ID 0 输入“偏移”字段:

KSKS表

从Kafka主题消息的输出可以看出,ScyllaDB表INSERT事件和数据是通过Scylla CDC Source Connector传输到Kafka消息中的。单击消息以查看完整的消息信息:

留言信息

该消息包含ScyllaDB 表名和键空间名称以及时间,以及操作前后的数据状态。由于这是插入操作,插入前的数据为空。

接下来,将另一行插入到 ScyllaDB 表中:

docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);

 

现在,在 Kafka 中,等待几秒钟,您可以看到新消息的详细信息:

新消息

清理

完成此实验室的工作后,您可以停止并删除 Docker 容器和映像。

查看所有容器 ID 的列表:

docker container ls -aq

 

然后你可以停止并删除你不再使用的容器:

docker stop <ID_or_Name> 
docker rm <ID_or_Name>

 

稍后,如果您想重新运行该实验室,您可以按照这些步骤并像以前一样使用 docker-compose。

概括

使用 CDC 源连接器,一个与 Kafka Connect 兼容的 Kafka 插件,您可以捕获所有 ScyllaDB 表行级更改(INSERTUPDATEDELETE)并将这些事件转换为 Kafka 消息。然后,您可以使用来自其他应用程序的数据或使用 Kafka 执行任何其他操作。

标签:ScyllaDB,据库表,源连接器,数据
来源:

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有