主机名 IP地址 部署服务
study01 DTLE、MySQL
study02 DTLE、MySQL
study03 DTLE、MySQL



dtle (Data-Transformation-le) 是上海爱可⽣信息技术股份有限公司 开发并开源的 CDC ⼯具. 其功能特点是:

  1. 多种数据传输模式

    • ⽀持链路压缩
    • ⽀持同构传输和异构传输
    • ⽀持跨⽹络边际的传输
  2. 多种数据处理模式

    • ⽀持库/表/⾏级别 数据过滤
  3. 多种数据通道模式

    • ⽀持多对多的数据传输
    • ⽀持回环传输
  4. 多种源/⽬标端

    • ⽀持MySQL - MySQL的数据传输
    • ⽀持MySQL - Kafka的数据传输
  5. 集群模式

    • 提供可靠的元数据存储
    • 可进⾏⾃动任务分配
    • ⽀持⾃动故障转移



  1. 按数据源/数据目标划分

    • 支持1个源端到一个目标端的复制
    • 支持多个源端到一个目标端的聚合复制
    • 支持一个源端到多个目标的拆分复制
  2. 按网络类型划分

    • 支持网络内部数据传输
    • 支持跨网络的数据传输(可使⽤ 链路压缩/链路限流 等功能)
  3. 按集群规模划分

    • 可配置单⼀dtle实例处理单⼀数据通道
    • 可配置 dtle集群 处理 多个数据通道



  1. 下载DTLE的RPM安装包
  1. 安装RPM包
[root@study01 ~]# rpm -ivh dtle- --prefix=/data/dtle
  1. 修改consul配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
[root@study01 dtle]# pwd

[root@study01 dtle]# cat consul.hcl 
# Rename for each node
node_name = "consul0"   #定义consul名称,多集群名字不能重复
data_dir = "/data/dtle/var/lib/consul"
ui = true

disable_update_check = true

# Address that should be bound to for internal cluster communications
bind_addr = ""
# Address to which Consul will bind client interfaces, including the HTTP and DNS servers
client_addr = ""    #本机ip地址
advertise_addr = "" #本机ip地址
ports = {
  # Customize if necessary. -1 means disable.
  #dns = -1
  #server = 8300
  #http = 8500
  #serf_wan = -1
  #serf_lan = 8301

limits = {
  http_max_conns_per_client = 4096

server = true
# For single node
bootstrap_expect = 1    #一台consul

# For 3-node cluster
#bootstrap_expect = 3   #三台consul,部署集群开启
#retry_join = ["", "", ""] # will use default serf port

log_level = "INFO"
log_file = "/data/dtle/var/log/consul/"

  1. 修改nomad配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
root@study01 dtle]# ls
consul.hcl  nomad.hcl

[root@study01 dtle]# pwd

[root@study01 dtle]# cat nomad.hcl
name = "nomad0" # rename for each node 定义nomad名字,部署集群名字不可重复
datacenter = "dc1"
data_dir  = "/data/dtle/var/lib/nomad"
plugin_dir = "/data/dtle/usr/share/dtle/nomad-plugin"

log_level = "Info"
log_file = "/data/dtle/var/log/nomad/"

disable_update_check = true

bind_addr = ""   #监听地址
# change ports if multiple nodes run on a same machine
ports {
  http = 4646
  rpc  = 4647
  serf = 4648
addresses {
  # Default to `bind_addr`. Or set individually here.
  #http = ""
  #rpc  = ""
  #serf = ""
advertise {
  http = ""
  rpc  = ""
  serf = ""

server {
  enabled          = true   #服务端开启

  bootstrap_expect = 1
  # Set bootstrap_expect to 3 for multiple (high-availablity) nodes.
  # Multiple nomad nodes will join with consul.

client {
  enabled = true
  options = {
    "driver.blacklist" = "docker,exec,java,mock,qemu,rawexec,rkt"

  # Will auto join other server with consul.

consul {
  # dtle-plugin and nomad itself use consul separately.
  # nomad uses consul for server_auto_join and client_auto_join.
  # Only one consul can be set here. Write the nearest here,
  #   e.g. the one runs on the same machine with the nomad server.
  address = "" #客户端开启

plugin "dtle" {
  config {
    log_level = "Info" # repeat nomad log level here
    data_dir = "/data/dtle/var/lib/nomad"
    nats_bind = ""
    nats_advertise = ""
    # Repeat the consul address above.
    consul = ""    #对接consul地址

    # By default, API compatibility layer is disabled.
    #api_addr = ""   # for compatibility API
    nomad_addr = "" # compatibility API need to access a nomad server

    publish_metrics = false
    stats_collection_interval = 15

  1. 启动nomad以及consul服务
systemctl restart dtle-consul.service
systemctl restart dtle-consul.service

[root@study01 dtle]# ps -ef | grep dtle
dtle      8580     1  1 14:21 ?        00:00:00 /data/dtle/usr/bin/consul agent -config-file=/data/dtle/etc/dtle/consul.hcl
dtle      8660     1 10 14:22 ?        00:00:00 /data/dtle/usr/bin/nomad agent -config /data/dtle/etc/dtle/nomad.hcl
root      8720  2787  0 14:22 pts/0    00:00:00 grep --color=auto dtle
  1. 登录nomad和consul的Web页面进行查看

  2. 在源端MySQL实例创建测试库和测试表,并在表中插入数据

mysql> create database test;
mysql> use test;
mysql> create table t1 (id int primary key);
mysql> insert into t1 values(1),(2),(3);
mysql> select * from t1;
  1. 创建job的json文件
[root@study01 dtle]# cat job.json
"Job": {
"ID": "job1",   #job名字
"Datacenters": ["dc1"],
"TaskGroups": [{
"Name": "src",
"Tasks": [{
"Name": "src",
"Driver": "dtle",
"Config": {
"Gtid": "",
"ReplicateDoDb": [{
"TableSchema": "test",  #复制的库
"Tables": [{
"TableName": "t1"   #复制的表
"ConnectionConfig": {
"Host": "", #源端连接信息
"Port": 3333,
"User": "test",
"Password": "test"
}, {
"Name": "dest",
"Tasks": [{
"Name": "dest",
"Driver": "dtle",
"Config": {
"ConnectionConfig": {
"Host": "", #目标端连接信息
"Port": 4444,
"User": "test",
"Password": "test"
  1. 利用curl命令调用nomad客户端的接口,创建这个job
[root@study01 dtle]# curl -XPOST "" -d @job.json -s | jq
  "EvalID": "a551d0fc-5de9-9e7b-3673-21b115919646",
  "EvalCreateIndex": 151,
  "JobModifyIndex": 150,
  "Warnings": "",
  "Index": 151,
  "LastContact": 0,
  "KnownLeader": false
  • -d @指定的是job的json文件的路径,我是在当前路径下执行所以不需要指定绝对路径
  • jq命令是用于提取返回结果的内容,类似与linux命令grep,需要安装才可以使用
  1. 查看创建job的状态
[root@study01 dtle]# curl -XGET "" -s | jq '.Status'
  • /v1/job/{创建job的ID名字}
  1. 进入到目标端数据库进行查看数据是否已经复制过去
mysql> show databases;
| Database           |
| dtle               |  #有一张二进制的表。用于记录job的GTID号
| information_schema |
| mysql              |
| performance_schema |
| sys                |
| test               |  #同步过来的库
| universe           |

mysql> select * from test.t1;
| id |
|  1 |
|  2 |
|  3 |
  1. 在源端数据库在插入几条数据,在进行测试
mysql> use test;
mysql> insert into t1 values(7),(8),(9);


mysql> select * from test.t1;
| id |
|  1 |
|  2 |
|  3 |
|  7 |
|  8 |
|  9 |

HTTP API、nomad 命令⾏⼯具


nomad 本体使⽤consul进⾏多节点注册和发现,dtle nomad 插件使⽤consul进⾏任务元数据储存,也就是说,当我们的nomad agent创建一个job的时候会通过consul进行记录元数据,以及复制位置点,如果我们要删除一个job的时候,不单单只是把job给删除,还需要删除consul中的记录,如果不删除consul中的记录,那么下次创建job的时候默认还是会从consul中记录的位置点开始复制。


实际上我们使用curl命令调用的是nomad agent端的HTTP端的接口,将我们本地的job.json文件提交到nomad agent端



  1. 先使用curl的方式删除job
[root@study01 dtle]# curl -XDELETE
  1. 删除job后如果想清空consul记录使用以下命令
[root@study01 dtle]# curl -XDELETE ""
  1. 使用nomad命令创建一个job

    • 复制一份hcl的模板,进行修改
    cp usr/share/dtle/scripts/example.job.hcl .
    • 修改hcl文件
    [root@study01 dtle]# mv example.job.hcl job.hcl
    [root@study01 dtle]# vim job.hcl    #修改复制\源\目标信息
  2. 使用nomad命令指定这个文件创建job

[root@study01 dtle]# ./usr/bin/nomad job run job.hcl
==> Monitoring evaluation "21c79d55"
    Evaluation triggered by job "job1"
    Evaluation within deployment: "c5aeae1d"
    Allocation "8640b0af" created: node "e1be240c", group "dest"
    Allocation "e44c8d29" created: node "e1be240c", group "src"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "21c79d55" finished with status "complete"
  1. 查询我们刚才创建job的状态
[root@study01 dtle]# ./usr/bin/nomad job status
ID    Type     Priority  Status   Submit Date
job1  service  50        running  2021-05-07T16:26:21+08:00
  1. 在源端插入数据,在目标端验证数据
  2. 停止job并删除
[root@study01 dtle]# ./usr/bin/nomad job stop -purge job1
==> Monitoring evaluation "dce8f4f3"
    Evaluation triggered by job "job1"
    Evaluation within deployment: "c5aeae1d"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "dce8f4f3" finished with status "complete"
  • 依然需要手动删除consul记录
[root@study01 dtle]# curl -XDELETE ""
  1. 查看server节点
[root@study01 dtle]# ./usr/bin/nomad server members
Name           Address       Port  Status  Leader  Protocol  Build   Datacenter  Region
nomad0.global  4648  alive   true    2         0.11.1  dc1         global
  1. 查看client节点
[root@study01 dtle]# ./usr/bin/nomad node status
ID        DC   Name    Class   Drain  Eligibility  Status
e1be240c  dc1  nomad0  <none>  false  eligible     ready
  1. 查看某个job
[root@study01 dtle]# ./usr/bin/nomad job status {jobname}
  1. 查看nomad版本
[root@study01 dtle]# ./usr/bin/nomad version
Nomad v0.11.1 (b43457070037800fcc8442c8ff095ff4005dab33)
  1. 查看某⼀节点的dtle插件版本
[root@study01 dtle]# ./usr/bin/nomad node status -verbose e1be240c | grep dtle
dtle      true      true     Healthy   2021-05-07T14:22:17+08:00
driver.dtle               = 1
driver.dtle.full_version  =
driver.dtle.version       =
  1. 此时nomad命令作为HTTP客⼾端连接nomad agent, 如果agent不在默认地址,则需要指定 例如:--address=
[root@study01 dtle]# ./usr/bin/nomad node status --address=
ID        DC   Name    Class   Drain  Eligibility  Status
e1be240c  dc1  nomad0  <none>  false  eligible     ready



  1. src1数据库操作
mysql> show master status\G
Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-140
mysql> use test;
mysql> create table t2(id int);
mysql> insert into t2 values(1),(2),(3);

mysql> select * from test.t2;
| id   |
|    1 |
|    2 |
|    3 |
  1. src2数据库操作
mysql> show master status \G
Executed_Gtid_Set: efc79686-af16-11eb-bbaa-02000aba4147:1-135
mysql> use test;
mysql> create table t2(id int);
mysql> insert into t2 values(4),(5),(6);
  1. 创建src1的配置文件
[root@study01 dtle]# cat job_src1.hcl
job "job1" {
  datacenters = ["dc1"]
  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ReplicateDoDb = [{
          TableSchema = "test"
          Tables = [{
            TableName = "t2"
        DropTableIfExists = false
        Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-140"
        ChunkSize = 2000
        ConnectionConfig = {
          Host = ""
          Port = 3333
          User = "test"
          Password = "test"
  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        ConnectionConfig = {
          Host = ""
          Port = 4444
          User = "test"
          Password = "test"
        # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
        #KafkaConfig = {
        #  Topic = "kafka1"
        #  Brokers = ["", ""]
        #  Converter = "json"
  reschedule {
    # By default, nomad will unlimitedly reschedule a failed task.
    # We limit it to once per 30min here.
    attempts = 1
    interval = "30m"
    unlimited = false
  1. 创建src2的配置文件
[root@study01 dtle]# cat job_src2.hcl
job "job2" {
  datacenters = ["dc1"]
  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ReplicateDoDb = [{
          TableSchema = "test"
          Tables = [{
            TableName = "t2"
        DropTableIfExists = false
        Gtid = "efc79686-af16-11eb-bbaa-02000aba4147:1-135"
        ChunkSize = 2000
        ConnectionConfig = {
          Host = ""
          Port = 5555
          User = "test"
          Password = "test"
  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        ConnectionConfig = {
          Host = ""
          Port = 4444
          User = "test"
          Password = "test"
        # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
        #KafkaConfig = {
        #  Topic = "kafka1"
        #  Brokers = ["", ""]
        #  Converter = "json"
  reschedule {
    # By default, nomad will unlimitedly reschedule a failed task.
    # We limit it to once per 30min here.
    attempts = 1
    interval = "30m"
    unlimited = false
  • 两个文件不同的地方有,job名称,GTID,源端连接地址
  1. 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_src1.hcl
==> Monitoring evaluation "28ae70e4"
    Evaluation triggered by job "job1"
    Evaluation within deployment: "95f9a76b"
    Allocation "d07787a4" created: node "e1be240c", group "dest"
    Allocation "f46d9291" created: node "e1be240c", group "src"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "28ae70e4" finished with status "complete"

[root@study01 dtle]# ./usr/bin/nomad job run job_src2.hcl
==> Monitoring evaluation "1d2da1f7"
    Evaluation triggered by job "job2"
    Evaluation within deployment: "716b11a5"
    Allocation "2dd7fe5c" created: node "e1be240c", group "src"
    Allocation "724ec296" created: node "e1be240c", group "dest"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "1d2da1f7" finished with status "complete"
  1. 目标端数据库验证数据
mysql> select * from test.t2;
| id   |
|    1 |
|    2 |
|    3 |
|    4 |
|    5 |
|    6 |
  1. 在任意一个源端写入数据,然后到目标端查看是否增量数据
mysql> insert into t2 values (7);
Query OK, 1 row affected (0.01 sec)

mysql> select * from test.t2;
| id   |
|    1 |
|    2 |
|    3 |
|    7 |
4 rows in set (0.01 sec)
  1. 验证目标端数据库的数据
mysql> select * from test.t2;
| id   |
|    1 |
|    2 |
|    3 |
|    4 |
|    5 |
|    6 |
|    7 |
7 rows in set (0.01 sec)



  1. 首先记录下源端数据库当前的GTID
mysql> show master status \G
Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-143
  1. 创建测试表,写入一些数据
mysql> use test;
mysql> create table t3(id int);
mysql> insert into t3 values(1),(2);
  1. 创建dst1配置文件
[root@study01 dtle]# cat job_dst1.hcl
job "dst1" {
  datacenters = ["dc1"]
  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ReplicateDoDb = [{
          TableSchema = "test"
          Tables = [{
            TableName = "t3"
            Where = "id<10"
        DropTableIfExists = false
        Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-143"
        ChunkSize = 2000
        ConnectionConfig = {
          Host = ""
          Port = 3333
          User = "test"
          Password = "test"
  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        ConnectionConfig = {
          Host = ""
          Port = 5555
          User = "test"
          Password = "test"
        # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
        #KafkaConfig = {
        #  Topic = "kafka1"
        #  Brokers = ["", ""]
        #  Converter = "json"
  reschedule {
    # By default, nomad will unlimitedly reschedule a failed task.
    # We limit it to once per 30min here.
    attempts = 1
    interval = "30m"
    unlimited = false
  1. 创建dst2配置文件


  1. 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_dst1.hcl
[root@study01 dtle]# ./usr/bin/nomad job run job_dst2.hcl

[root@study01 dtle]# ./usr/bin/nomad status
ID    Type     Priority  Status   Submit Date
dst1  service  50        running  2021-05-07T20:13:05+08:00
dst2  service  50        running  2021-05-07T20:13:10+08:00
  1. 验证数据,小于10的会在71这台数据库,大于等于10的会在72这台数据库

  2. 在源端插入数据,然后分别到目标端进行查看

mysql> insert into t3 values(9),(10);
mysql> select * from test.t3;
| id   |
|    1 |
|    2 |
|    9 |
|   10 |
  1. 到71上查看数据
mysql> select * from test.t3;
| id   |
|    1 |
|    2 |
|    9 |
  1. 到72上查看数据
mysql> select * from test.t3;
| id   |
|   10 |

