ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Canal实现MySQL协议

2022-05-19 19:03:41  阅读:231  来源: 互联网

标签:Canal 协议 log dump erosaConnection MySQL MySql bin


目录


在学习Canal的时候很好奇Canal是如何模拟成MySql Slave来接收数据的

MySql Slave会向主库发送dump协议来接收bin-log数据

Canal也是类似,在发起dump协议时会先获取MySql当前的bin-log信息,在根据自身已经消费的偏移量来判断从哪个位置开始获取,最后MySql将bin-log事件返回给Canal

Canal实现向MySql复制bin-log的实现类在MysqlEventParser,可以在AbstractEventParser中看到MysqlEventParser的执行流程

代码流程

// 开始执行replication
// 1. 构造Erosa连接
erosaConnection = buildErosaConnection();
// 2. 启动一个心跳线程
startHeartBeat(erosaConnection);
// 3. 执行dump前的准备工作
preDump(erosaConnection);
erosaConnection.connect();// 链接
// 4. 获取最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();  
//调用dump方法开始同步
erosaConnection.dump                                         

执行dump前

buildErosaConnection()方法会将配置文件中的MySql配置转换为MysqlConnection

protected ErosaConnection buildErosaConnection() {
        return buildMysqlConnection(this.runningInfo);
 }

findStartPosition(erosaConnection)方法会去解析和获取位点信息,

通过CanalLogPositionManagerCanalMetaManager

CanalLogPositionManager会去解析meta.dat里的数据,Canal提供了很多种读取meta.dat的方式

可以看看meta.dat里存了什么

{
	"clientDatas": [{
		"clientIdentity": {
			"clientId": 1001,
			"destination": "example",
            //过滤策略
			"filter": ".*\\..*"
		},
		"cursor": {
			"identity": {
				"slaveId": -1,
				"sourceAddress": {
					"address": "localhost",
					"port": 3307
				}
			},
            //记录的位点信息
			"postion": {
				"gtid": "",
				"included": false,
				"journalName": "binlog.000002",
				"position": 11628,
				"serverId": 1,
				"timestamp": 1639646437000
			}
		}
	}],
    //实例名称
	"destination": "example"
}

获取位点

//读取本地的meta.dat
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
//如果没获取到,就获取新的,默认从当前最后一个位置进行消费
findEndPositionWithMasterIdAndTimestamp(mysqlConnection)
//从Mysql获取
ResultSetPacket packet = mysqlConnection.query("show master status");
//如果没有指定binlogName,尝试按照timestamp进行查找
findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp())

接着往下看,会看到Canal创建解析bin-log处理器,调用BinlogParser来解析成事件

final SinkFunction sinkHandler = new SinkFunction<EVENT>()

CanalEntry.Entry event = binlogParser.parse(bod, isSeek);

最后就是开始调用dump()了,在此之前会判断是否开启了并行,这个配置是

canal.instance.parser.parallel = true

如果是true的话就是用MultiStageCoprocessor处理其去解析,否则就是用上边创建的sinkHandler去处理

执行dump

Canal的dump方法实现在MysqlConnection,通过传入bin-log的文件名和偏移量和串行或并行的处理器

并行处理
public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor)
串行处理
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func)

MultiStageCoprocessorSinkFunction是用来解析bin-log

区别是MultiStageCoprocessor是用多线程来处理

针对解析器提供一个多阶段协同的处理
1. 网络接收 (单线程)
2. 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
3. 事件深度解析 (多线程, DML事件数据的完整解析)
4. 投递到store (单线程)

两个方法的前几步都是相同的

  updateSettings();
  loadBinlogChecksum();
  sendRegisterSlave(); //向Mysql注册从节点
  sendBinlogDump(binlogfilename, binlogPosition);

updateSettings();方法会向MySql执行一些设置sql,比如超时设置checksum、设置一个server_id、设置心跳等

 update("set wait_timeout=9999999");
 update("set @master_binlog_checksum= @@global.binlog_checksum");
 update("set @slave_uuid=uuid()");

顺便可以看看Canal是怎么执行sql的,他是将sql转成字节,并封装成数据包,然后通过SocketChannel传输给MySql执行

 //封装成数据包
 public byte[] toBytes() {
        byte[] data = new byte[4];
        data[0] = (byte) (packetBodyLength & 0xFF);
        data[1] = (byte) (packetBodyLength >>> 8);
        data[2] = (byte) (packetBodyLength >>> 16);
        data[3] = getPacketSequenceNumber();
        return data;
    }
//传输给Mysql
PacketManager.writeBody(connector.getChannel(), bodyBytes);

loadBinlogChecksum();方法向MySql查询刚刚设置的checksum,并赋值给系统

query("select @@global.binlog_checksum");

这是由于binlog event发送回来的时候需要,在最后获取event内容的时候,会增加4个额外字节做校验用。mysql5.6.5以后的版本中binlog_checksum=crc32,而低版本都是binlog_checksum=none

sendBinlogDump()发送dump请求,通过BinlogDumpCommandPacket封装成dump数据包,向MySql发送COM_BINLOG_DUMP指令

bin-log数据会从SocketChannel中返回,Canal会去创建一个DirectLogFetcher对象去接收数据,然后创建解码器

//遍历数据
while (fetcher.fetch()) {
			//消费
            accumulateReceivedBytes(fetcher.limit());
            LogEvent event = null;
            //解码
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }
			//还原成Canal事件,这里的func是上边创建的sinkHandler
            if (!func.sink(event)) {
                break;
            }
			//发送确认包
            if (event.getSemival() == 1) {
                sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
            }
        }

以上就是Canal实现dump的过程

https://juejin.cn/post/6844904077583712264

https://www.helloworld.net/p/5322586930

标签:Canal,协议,log,dump,erosaConnection,MySQL,MySql,bin
来源: https://www.cnblogs.com/aruo/p/16287960.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有