ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Building a Sync Engin

2022-02-01 21:35:38  阅读:204  来源: 互联网

标签:Building rows const watermark await Sync updatedAt Engin row


内容来自:https://www.grouparoo.com/blog/building-a-sync-engine
内容主要介绍了如何开发一个同步引擎,没有太多高深的,主要是基于了变动的时间戳以及水印算法

简单说明

  • 预备
    添加水印列,当然对于不同的数据库处理方式会不一样的,有些可能需要通过触发器
 
ALTER TABLE users ADD COLUMN mysql_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
  • 简单算法模式
    基于水印列判断
 
const watermark = await getWatermark();
let rows;
if (!watermark) {
  // first time we've ever sync'd - get all rows
  rows = await User.findAll();
} else {
  rows = await User.findAll({
    // otherwise, use watermark
    where: {
      updatedAt: {
        [Op.gt]: watermark, // WHERE updatedAt > {watermark}
      },
    },
    order: [["updatedAt", "ASC"]],
  });
}
 
if (rows && rows.length > 0) {
  for (const row of rows) {
    await processRow(row);
  }
 
  const newWatermark = new Date(); // set to now
  await setWatermark(newWatermark); // for next time
}
return true; // done!

问题:数据只能增长,数据同步的时候可能会变动,数据可能会重复处理,时间可能会不一致(服务器时间戳,,,)
修复,时间问题,基于db 时间

 
const watermark = await getWatermark();
let rows;
if (!watermark) {
  // first time we've ever sync'd - get all rows
  rows = await User.findAll();
} else {
  rows = await User.findAll({
    // otherwise, use watermark
    where: {
      updatedAt: {
        [Op.gte]: watermark, // WHERE updatedAt >= {watermark}
      },
    },
    order: [["updatedAt", "ASC"]],
  });
}
 
if (rows && rows.length > 0) {
  for (const row of rows) {
    await processRow(row);
  }
 
  const newWatermark = rows[rows.length - 1].updatedAt;
  await setWatermark(newWatermark); // for next time
}
return true; // done!
  • 批处理
    基于偏移以及分页
 
// using node and sequelize
const saved = await getWatermark();
const watermark = saved ? saved.watermark : null;
const oldOffset = saved ? saved.offset || 0 : null;
const sqlOptions = {
  limit: batchSize,
  offset: oldOffset,
  order: [["updatedAt", "ASC"]],
};
 
if (watermark) {
  sqlOptions.where = {
    updatedAt: {
      [Op.gte]: watermark, // WHERE updatedAt >= {watermark}
    },
  };
}
 
const rows = await User.findAll(sqlOptions);
if (!rows || rows.length === 0) {
  return true;
} else {
  for (const row of rows) {
    await processRow(row);
  }
 
  const done = rows.length < batchSize; // is there more to be done?
  const lastTime = rows[rows.length - 1].updatedAt.getTime();
  let newOffset = 0;
  if (!done && watermark === lastTime) {
    // the last one was the same as the first, need to use offset
    newOffset = oldOffset + batchSize;
  }
 
  await setWatermark({ watermark: lastTime, offset: newOffset });
  return done;
}

说明

以上方法还是值得参考学习的,尽管有时我们是不能直接使用的,但是还是很不错的实践,cdc,except 有时可能会是一个其他的选择

参考资

https://www.grouparoo.com/blog/building-a-sync-engine
https://github.com/grouparoo/sync-engine-example

标签:Building,rows,const,watermark,await,Sync,updatedAt,Engin,row
来源: https://www.cnblogs.com/rongfengliang/p/15859650.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有