ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

使用canal监听mysql数据变化

2021-06-11 10:57:11  阅读:220  来源: 互联网

标签:canal CanalEntry connector mysql entry com 监听


### canal介绍 canal是阿里开源的数据库同步框架,采用非侵入式方式,解析mysql的`binary log`,再发送到目的地,目的地可是`mq`,`hbase`,`mysql`,`es`等. ### 本章流程 1. 开启mysql的bin-log日志 2. 创建mysql用户获取bin-log日志 3. canal采集bin-log日志 4. canal-client获取mysql变化信息 #### 开启bin-log日志 只需要在`mysqld.cnf`新增配置 ``` server-id=1 log-bin=mysql-bin ``` #### 创建mysql用户 ```sql create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; ``` #### 配置canal ```bash # 配置文件1:canal-server/conf/canal.properties # 端口 canal.port = 11111 # 配置文件2:canal-server/conf/example/instance.properties # 数据库连接信息 canal.instance.master.address=192.168.41.128:3307 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 监听的表(正则表达式) canal.instance.filter.regex=.*\\..* # 主题 canal.mq.topic=example ``` #### 启动mysql/canal ```bash # 本地测试采用docker方式启动 docker run -d --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=123456 -v $PWD/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf hub.c.163.com/library/mysql:5.7 docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server ``` #### 编写canal-client ```java package com.deri.stream.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; /** * @ClassName: Main * @Description: TODO * @Author: wuzhiyong * @Time: 2021/6/11 9:41 * @Version: v1.0 **/ public class Main { public static void main(String[] args) throws InterruptedException { CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("192.168.41.128",11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // 没有变化,等一秒钟再去拉取数据 Thread.sleep(1000); } else { printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } finally { connector.disconnect(); } } private static void printEntry(List entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } } ``` ### 参考连接 - https://www.cnblogs.com/moris5013/p/12371549.html - https://blog.csdn.net/weixin_35852328/article/details/87600833

标签:canal,CanalEntry,connector,mysql,entry,com,监听
来源: https://blog.51cto.com/happywzy/2895785

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有