ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Canal-保存mysql篇

2022-04-10 00:00:07  阅读:239  来源: 互联网

标签:Canal canal jdbc CanalEntry mysql 保存 sql entry


Canal-保存mysql篇

一、java实现

先用java代码手写一遍,方便后续业务逻辑理解

1、maven配置:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.24</version>
        </dependency>

        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

2、application.properties配置

# 服务端口
server.port=10000
# 服务名
spring.application.name=canal-client

# 环境设置:dev、test、prod
spring.profiles.active=dev

# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=603409875

3、代码实现

  • 首先声明一个队列来接收sql语句

    		//sql队列
        private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    		@Resource
        private DataSource dataSource;
    
  • 编写主流程方法

    // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("127.0.0.1",
                    11111), "example", "", "");
            int batchSize = 1000;
            try {
                connector.connect();
                connector.subscribe("cancal_mysql\\..*");//订阅cancal_mysql库下的全表
                connector.rollback();
                try {
                    while (true) {
                        //尝试从master那边拉去数据batchSize条记录,有多少取多少
                        Message message = connector.getWithoutAck(batchSize);
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            Thread.sleep(1000);
                        } else {
                            //数据处理
                            dataHandle(message.getEntries());
                        }
                        connector.ack(batchId);
    
                        //当队列里面堆积的sql大于一定数值的时候就模拟执行
                        if (SQL_QUEUE.size() >= 1) {
                            executeQueueSql();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            } finally {
                connector.disconnect();
            }
    
  • 数据处理方法

     /**
         * 数据处理
         *
         * @param entrys
         */
        private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
            for (CanalEntry.Entry entry : entrys) {
                if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.DELETE) {
                        saveDeleteSql(entry);
                    } else if (eventType == CanalEntry.EventType.UPDATE) {
                        saveUpdateSql(entry);
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        saveInsertSql(entry);
                    }
                }
            }
        }
    
  • 解析sql并保存sql语句方法

    • 删除(只做了简单的语句删除,下面类似)

      /**
           * 保存删除语句
           *
           * @param entry
           */
          private void saveDeleteSql(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                  for (CanalEntry.RowData rowData : rowDatasList) {
                      List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
                      StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                      for (CanalEntry.Column column : columnList) {
                          if (column.getIsKey()) {
                              //暂时只支持单一主键
                              if(column.getMysqlType().contains("varchar")){
                                  sql.append(column.getName() + "='" + column.getValue()+"'");
                              }else {
                                  sql.append(column.getName() + "=" + column.getValue());
                              }
                              break;
                          }
                      }
                      SQL_QUEUE.add(sql.toString());
                  }
              } catch (InvalidProtocolBufferException e) {
                  e.printStackTrace();
              }
          }
      
    • 新增

      /**
           * 保存插入语句
           *
           * @param entry
           */
          private void saveInsertSql(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                  for (CanalEntry.RowData rowData : rowDatasList) {
                      List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
                      StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                      for (int i = 0; i < columnList.size(); i++) {
                          sql.append(columnList.get(i).getName());
                          if (i != columnList.size() - 1) {
                              sql.append(",");
                          }
                      }
                      sql.append(") VALUES (");
                      for (int i = 0; i < columnList.size(); i++) {
                          sql.append("'" + columnList.get(i).getValue() + "'");
                          if (i != columnList.size() - 1) {
                              sql.append(",");
                          }
                      }
                      sql.append(")");
                      SQL_QUEUE.add(sql.toString());
                  }
              } catch (InvalidProtocolBufferException e) {
                  e.printStackTrace();
              }
          }
      
    • 修改

      /**
           * 保存更新语句
           *
           * @param entry
           */
          private void saveUpdateSql(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                  for (CanalEntry.RowData rowData : rowDatasList) {
                      List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
                      StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                      for (int i = 0; i < newColumnList.size(); i++) {
                          sql.append(" " + newColumnList.get(i).getName()
                                  + " = '" + newColumnList.get(i).getValue() + "'");
                          if (i != newColumnList.size() - 1) {
                              sql.append(",");
                          }
                      }
                      sql.append(" where ");
                      List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
                      for (CanalEntry.Column column : oldColumnList) {
                          if (column.getIsKey()) {
                              //暂时只支持单一主键
                              if(column.getMysqlType().contains("varchar")){
                                  sql.append(column.getName() + "='" + column.getValue()+"'");
                              }else {
                                  sql.append(column.getName() + "=" + column.getValue());
                              }
                              break;
                          }
                      }
                      SQL_QUEUE.add(sql.toString());
                  }
              } catch (InvalidProtocolBufferException e) {
                  e.printStackTrace();
              }
          }
      
  • jdbc操作

     /**
         * 入库
         * @param sql
         */
        public void execute(String sql) {
            Connection con = null;
            try {
                if(null == sql) return;
                con = dataSource.getConnection();
                QueryRunner qr = new QueryRunner();
                int row = qr.update(con, sql);
                System.out.println("update: "+ row);
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                DbUtils.closeQuietly(con);
            }
        }
    

4、效果

运行sql语句:

INSERT INTO `tb_commodity_info` ( `id`, `commodity_name`, `commodity_price`, `number`, `description` )
VALUES
	( '030acbd3b71011ecb9760242ac110005', '测试0001', '5.88', 11, '描述信息0001' );

执行结果:

库1:

库2:

二、使用canal.adapter来做实现

上面的代码初步可以实现一般情况下的数据迁移,接下来我们来实现一下给予adapter的方式

1、安装canal.adapter

  • 先从github上将adapter下载下来

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz

  • 解压

mkdir /tmp/canal_adapter
tar zxvf canal.adapter-1.1.5.tar.gz  -C /tmp/canal_adapter
  • 修改配置文件conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    #kafka.bootstrap.servers: 127.0.0.1:9092
    #kafka.enable.auto.commit: false
    #kafka.auto.commit.interval.ms: 1000
    #kafka.auto.offset.reset: latest
    #kafka.request.timeout.ms: 40000
#    kafka.session.timeout.ms: 30000
#    kafka.isolation.level: read_committed
#    kafka.max.poll.records: 1000
    # rocketMQ consumer
#    rocketmq.namespace:
#    rocketmq.namesrv.addr: 127.0.0.1:9876
#    rocketmq.batch.size: 1000
#    rocketmq.enable.message.trace: false
#    rocketmq.customized.trace.topic:
#    rocketmq.access.channel:
#    rocketmq.subscribe.filter:
    # rabbitMQ consumer
#    rabbitmq.host:
#    rabbitmq.virtual.host:
#    rabbitmq.username:
#    rabbitmq.password:
#    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://源数据库ip:端口/cancal_mysql?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
#      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:3306/cancal_mysql?useUnicode=true
          jdbc.username: root
          jdbc.password: 603409875
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address
  • 修改conf/rdb/tb_commodity_info.yml

    修改需要监听的表名为yml,若监听多个表可以生产多个yml

    mv mytest_user.yml tb_commodity_info.yml
    

    在修改配置文件

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    outerAdapterKey: mysql1
    concurrent: true
    dbMapping:
      database: cancal_mysql
      table: tb_commodity_info
      targetTable: tb_commodity_info
      targetPk:
        id: id
      mapAll: true
    #  targetColumns:
    #    id:
    #    name:
    #    role_id:
    #    c_time:
    #    test1:
    #  etlCondition: "where c_time>={}"
      commitBatch: 3000 # 批量提交的大小
    
    
    ## Mirror schema synchronize config
    #dataSourceKey: defaultDS
    #destination: example
    #groupId: g1
    #outerAdapterKey: mysql1
    #concurrent: true
    #dbMapping:
    #  mirrorDb: true
    #  database: mytest
    
    
  • 注意:由于我存的是mysql8,所以需要在lib中加入8的mysql-connector-java-8.0.24.jar

  • 执行启动命令

sh bin/startup.sh

不出意外的果然出意外了

由于出现了意外,所以我们吧源码拉下来跑一遍后,更换了上面的配置等问题,终于成功了

三、源码解析

1、拉取源码到本地

git clone https://github.com/alibaba/canal.git

2、打开项目

3、修改配置

adapter项目启动入口是client-adapter包下的launcher模块

修改配置文件:application.ymlrdb目录下的tb_commodity_info.yml

4、启动项目

启动类CanalAdapterApplication

5、记录问题

  • spi找不到rdb扩展实现类

通过源码追溯

在启动时,依据配置需要加载rdb的spi,需要在之前加载logger,所以需要在配置中加入logger

canalAdapters:
    - instance: example # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
          - name: logger   #解决此bug问题
            - name: rdb
              key: mysql1
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver
                jdbc.url: jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
                jdbc.username: root
                jdbc.password: 603409875
  • 解决了logger问题后还是有上面问题

这时候可以将client-adapter的maven进行install一遍,会出现不少缺jar包的方式,我执行时缺了俩个jar

将这俩个jar打包出来后,在install adapter就可以了

注意:若还不行,可以先将launcher package然后在执行

  • mysql1的kay找不到bug

进入ConfigLoader&load(Properties envProperties)方法中,看到加载表映射关系,跟代码后发现,说没有映射文件

为了查这个问题,我又跑到spi加载文件的地方去查,查到加载的路径是:

canal/1.1.5/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin/client-adapter.rdb-1.1.5-jar-with-dependencies.jar

打开jar包,原来是之前没改的时候的jar,此时又去查询install了rdb的包,在重新pack一遍,终于可以运行了

标签:Canal,canal,jdbc,CanalEntry,mysql,保存,sql,entry
来源: https://www.cnblogs.com/java5wanping/p/16124352.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有