ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

2021-01-09 15:02:01  阅读:1191  来源: 互联网

标签:src name datax mysql target id ETL DataX


开源 ETL 工具 DataX 实践,从mysql 到不同结构的另一个mysql的全量同步和批量更新

链接: datax官方项目地址 查看全量同步 查看批量更新

实践步骤:

参照官方文档,采用方法一部署

在这里插入图片描述

如果点击下载没反应,手动复制地址,把http换成https
在这里插入图片描述

下载解压完成,运行自检脚本

在这里插入图片描述

File “datax.py”, line 114 print readerRef 。因为我电脑安装的是python3 ,脚本里是python2语法

修改下 datax.py 中 114行后面的print print xx 改为 print(xx)

try except Exception, e: 改为 try except Exception as e

修改完datax.py后再次运行

在这里插入图片描述

成功!

参考官方文档,编写一个从一个mysql到另一个mysql的全量同步
先准备数据库

1、建两张表 datax_src和datax_target 表 ,就两个字段,其中第二个字段名称不一样 ,案列演示 从datax_src读取数据,写入到datax_target中

-- datax_src 表  字段 id、src_name
CREATE TABLE `datax_src` (
  `id` bigint NOT NULL,
  `src_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- datax_target 表  字段 id、target_name
CREATE TABLE `datax_target` (
  `id` bigint NOT NULL,
  `target_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2、创建一个函数,向datax_src插入5000条数据

CREATE DEFINER=`root`@`%` FUNCTION `AUTO_INSERT`() RETURNS int
    DETERMINISTIC
BEGIN
	DECLARE index_num int DEFAULT 0;
		WHILE index_num < 5000 
			DO
				SET index_num = index_num + 1;
				INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num));
		END WHILE;
	RETURN 0;
END
编写job json文件

按照 文档中的 mysqlreader 、mysqlwriter 的示例修改

在这里插入图片描述

在这里插入图片描述

1、修改mysqlreader的示例 从datax_src同步抽取数据到本地:
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "datax_src"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

将json文件放在job文件夹中,运行

在这里插入图片描述

在这里插入图片描述

抛异常了:

DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确

可是密码明明是正确的。然后想着修改json文件换了一个数据库,再执行,成功了!
发现区别就是mysql的版本不一样 mysql5.0.27的执行成功了, 失败这个库是mysql8.0.22。
所以看了下datax目录结构,发现连接器位置如下图。 找了下连接器jar包,把reader和writer文件夹中的mysql-connector5换成了8(这里只截图了reader)

在这里插入图片描述

再次执行json文件job:成功执行

在这里插入图片描述

2、修改mysqlwriter的示例 ,结合mysqlreader ,把读写合并,
完整的 json文件如下 ,具体参数的含义。官方文档中有详细介绍
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "datax_src"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "target_name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from target_name"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "datax_target"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

执行!成功!!

在这里插入图片描述

查看 datax_target 表: 数据已成功写入

在这里插入图片描述

以上是全表数据同步

参考官方文档,再编写一个从一个mysql到另一个mysql的批量更新

下面通过自定义querySql实现一下部分更新的操作

1、更新datax_src表的部分数据,更新id小于10的9条数据

在这里插入图片描述

datax 的 job 的 json文件如下: 加了一个 querySql 参数,并把 writer 中的 writeMode 换成 update

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
								 "querySql": [
                                    "select id,src_name from datax_src where id < 10;"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "target_name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from datax_target"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "datax_target"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

执行,成功

在这里插入图片描述

查看下datax_target中的数据:也已被成功更新

在这里插入图片描述

标签:src,name,datax,mysql,target,id,ETL,DataX
来源: https://blog.csdn.net/qq_40225004/article/details/112391596

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有