ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

DataX的简单应用

2022-01-11 20:59:10  阅读:282  来源: 互联网

标签:province code name base iso DataX 应用 简单 id


文章目录

1、DataX模板

方式一:DataX配置文件模板

python bin/datax.py -r mysqlreader -w hdfswriter

在这里插入图片描述

方式二:官方文档https://github.com/alibaba/DataX/blob/master/README.md

2、同步Mysql数据到HDFS案例

2.1 MySQLReader之TableMode

使用table,column,where等属性声明需要同步的数据

mysql_to_hdfs_T.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2"
                        ],
                        "where": "id>=3",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "table": [
                                    "base_province"
                                ]
                            }
                        ],
                        "password": "123456",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

MysqlReader–TableMode格式

在这里插入图片描述

HDFSWriter格式

在这里插入图片描述

HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(’’),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。

解决方案

  • 修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑https://blog.csdn.net/u010834071/article/details/105506580

  • 在Hive中建表时指定null值存储格式为空字符串(’’)

    DROP TABLE IF EXISTS base_province;
    CREATE EXTERNAL TABLE base_province
    (
        `id`         STRING COMMENT '编号',
        `name`       STRING COMMENT '省份名称',
        `region_id`  STRING COMMENT '地区ID',
        `area_code`  STRING COMMENT '地区编码',
        `iso_code`   STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
        `iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
    ) COMMENT '省份表'
        ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
        NULL DEFINED AS ''
        LOCATION '/base_province/';
    

Setting参数说明:

在这里插入图片描述

容错比例配置均设为0

提交任务测试

  • 需要先在hdfs上创建好指定目录

    hadoop fs -mkdir /base_province
    

    可通过修改DataX源码进行创建目录

  • 执行命令将mysql中的数据导入hdfs

    python bin/datax.py job/mysql_to_hdfs_T.json 
    
  • 查看hdfs

    hadoop fs -cat /base_province/* | zcat
    

2.2 MySQLReader之QuerySQLMode

通过使用一条SQL查询语句声明需要同步的数据。

mysql_to_hdfs_sql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

MysqlReader–QuerySQLMode格式:

在这里插入图片描述

提交任务测试

  • 删除/base_province/下的所有文件

    hadoop fs -rm -r -f /base_province/*
    
  • 执行命令

    python bin/datax.py job/mysql_to_hdfs_sql.json
    
  • 查看HDFS文件

    hadoop fs -cat /base_province/* | zcat
    

3、同步HDFS数据到Mysql案例

hdfs_to_mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop102:8020",
                        "path": "/base_province",
                        "column": [
                            "*"
                        ],
                        "fileType": "text",
                        "compress": "gzip",
                        "encoding": "UTF-8",
                        "nullFormat": "\\N",
                        "fieldDelimiter": "\t",
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "table": [
                                    "test_province"
                                ],
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"
                            }
                        ],
                        "column": [
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2"
                        ],
                        "writeMode": "replace"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

HDFSReader

在这里插入图片描述

MySQLWriter

在这里插入图片描述

提交任务测试

  • 在Mysql中创建test_province表

    DROP TABLE IF EXISTS `test_province`;
    CREATE TABLE `test_province`  (
      `id` bigint(20) NOT NULL,
      `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
    

    可通过Navicate工具创建表

  • 执行命令

    python bin/datax.py job/test_province.json 
    
  • 刷新查看mysql数据

4、DataX传参案例

​ 离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。

用法:

​ 在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值。

test_parameter.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province/${dt}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

提交任务测试

  • 在hdfs上创建路径

    hadoop fs -mkdir /base_province/2022-01-11
    
  • 执行命令

    python bin/datax.py job/test_parameter.json -p"-Ddt=2020-06-14" 
    
  • 在hdfs上查看

标签:province,code,name,base,iso,DataX,应用,简单,id
来源: https://blog.csdn.net/qq_36593748/article/details/122441431

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有