ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

DataX初步使用及HDFSWRITER插件回车换行

2022-05-16 18:04:26  阅读:233  来源: 互联网

标签:INFO case 插件 SUCCESS HDFSWRITER break recordList add DataX


最近在研究把业务数据抽到Hive,原本想使用Sqoop抽取,后来发现Sqoop不够灵活,可能是我了解不深,但目前感觉在增量抽取上有些无奈,对于那些需于其他表关联且增量字段从其他表中取时,我到时没有找到sqoop的实现方式,于是寻找其他工具替代,发现DataX似乎是不错的选择,如果有特殊的地方还能自己开发。

1.下载安装

安装很简单,从github上下载编译好的就能用,下载地址:
https://github.com/alibaba/DataX/blob/master/userGuid.md
放在服务器上:

[hadoop@FineReportAppServer bin]$ pwd
/home/hadoop/datax/datax/bin

2.初步使用

在以往的数仓使用中,由于卸数工具的限制,经常出现回车换行导致的入仓失败,所以如果不能处理回车换行的工具不是好工具。
于是在数据库里插入了两条数据,其中一条是带回车换行的

从navicat复制出来可以看到是带回车的

写了个sql生成json文件内容(只是以后拿来改方便,每个插件的json配置与说明在相应的目录里有各自的文件说明和模板):


select '{'
||'"job":{'
||		'"content":['
||					'{'
||					   '"reader":{'
||												'"name":'||'"oraclereader"'||','
||												'"parameter":{'
||																			'"column":['
||																				wm_concat('"'||column_name||'"')
||																			'],'
||																		  '"connection":['
||																					'{'
||																						'"jdbcUrl": ["jdbc:oracle:thin:@192.168.xxxx.xxx:1521:xxx"],' 
||																						 '"table": ["'||table_name||'"]'
||																					'}'
||																					          '],'
||																				   '"username": "xxx",'
||																					 '"password": "xxx",'																				
||																		'}'
||												'},'
||								'"writer":{'
||											'"name":'||'"hdfswriter"'||','
||											'"parameter":{'
||																	'"defaultFS": "hdfs://192.168.0.143:9000",'
||																	'"fileType": "TEXT",'
||																	'"path": "/DATA/ODS/'||upper(table_name)||'",'
||																	'"fileName": "hdfswriter",'
||																	'"column": ['
||																	wm_concat('{"name":"'||column_name||'","type":"'|| decode(data_type,'NUMBER', 'double','varchar')||'"}')
||																						'],'
||																	'"writeMode": "append",'
||																  '"fieldDelimiter": ","' 					
||																	'},'
||                       '"writeMode": "append",'
||												'}'					
||					'}'
||		'],'
||		'"setting":{'
||							 '"speed": {'
||										'"channel": "2"'
||								'}'
||						  '}'
|| '}'
|| '}'
from dba_tab_columns where owner ='xxx' and table_name  ='AAA' group by table_name

放到服务器上运行:

python datax.py  ~/DataXJsonConfig/DRP/aaa.json

运行后把文件get一下打开

可以看到里面有个^M,这个东西就是回车换行,如果用notepad++打开(设置为view->show symbol->show all characters)会发现有个黑色的crlf,cr是\r,lf是\n,那么\r\n就是windows的回车换行了,很显然读到hdfs里DataX并没有把它去掉。

3.修改插件

首先git clone下代码,找到hdfswriter插件修改

public static MutablePair<List<Object>, Boolean> transportOneRecord(
            Record record,List<Configuration> columnsConfiguration,
            TaskPluginCollector taskPluginCollector){

        MutablePair<List<Object>, Boolean> transportResult = new MutablePair<List<Object>, Boolean>();
        transportResult.setRight(false);
        List<Object> recordList = Lists.newArrayList();
        int recordLength = record.getColumnNumber();
        if (0 != recordLength) {
            Column column;
            for (int i = 0; i < recordLength; i++) {
                column = record.getColumn(i);
                //todo as method
                if (null != column.getRawData()) {
                    String rowData = column.getRawData().toString();
//增加回车换行处理
	                rowData = rowData.replaceAll("\\\r\n|\\\r|\\\n","");
 	               SupportHiveDataType columnType = SupportHiveDataType.valueOf(
                            columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
                    //根据writer端类型配置做类型转换
                    try {
                        switch (columnType) {
                            case TINYINT:
                                recordList.add(Byte.valueOf(rowData));
                                break;
                            case SMALLINT:
                                recordList.add(Short.valueOf(rowData));
                                break;
                            case INT:
                                recordList.add(Integer.valueOf(rowData));
                                break;
                            case BIGINT:
                                recordList.add(Long.valueOf(rowData));
                                break;
                            case FLOAT:
                                recordList.add(Float.valueOf(rowData));
                                break;
                            case DOUBLE:
                                recordList.add(Double.valueOf(rowData));
                                break;
                            case STRING:
                            case VARCHAR:
                            case CHAR:
                                recordList.add(rowData);
                                break;
                            case BOOLEAN:
                                recordList.add(Boolean.valueOf(rowData));
                                break;
                            case DATE:

                                recordList.add(new java.sql.Date(column.asDate().getTime()));
                                break;
                            case TIMESTAMP:
                                recordList.add(new java.sql.Timestamp(column.asDate().getTime()));
                                break;
                            default:
                                throw DataXException
                                        .asDataXException(
                                                HdfsWriterErrorCode.ILLEGAL_VALUE,
                                                String.format(
                                                        "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
                                                        columnsConfiguration.get(i).getString(Key.NAME),
                                                        columnsConfiguration.get(i).getString(Key.TYPE)));
                        }
                    } catch (Exception e) {
                        // warn: 此处认为脏数据
                        String message = String.format(
                                "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",
                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData().toString());
                        taskPluginCollector.collectDirtyRecord(record, message);
                        transportResult.setRight(true);
                        break;
                    }
                }else {
                    // warn: it's all ok if nullFormat is null
                    recordList.add(null);
                }
            }
        }
        transportResult.setLeft(recordList);
        return transportResult;
    }

修改DataX目录下的pom.xml文件,将不需要编译的插件模块去掉,最后剩余的模块如下:

    <modules>
        <module>common</module>
        <module>core</module>
        <module>transformer</module>       
        <!-- writer -->
        
        <module>hdfswriter</module>
        <!-- common support module -->
        <module>plugin-rdbms-util</module>
        <module>plugin-unstructured-storage-util</module>
        <module>hbase20xsqlreader</module>
        <module>hbase20xsqlwriter</module>
        <module>kuduwriter</module>
   </modules>

编译肯定会包jar包缺少的,但是也没有想象中的那么难,缺少哪个jar包就从网上下载下来丢到本地的mvn仓库里,jar可以从这里下载:https://public.nexus.pentaho.org/
编译成功:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO]
[INFO] datax-all .......................................... SUCCESS [ 38.236 s]
[INFO] datax-common ....................................... SUCCESS [  4.546 s]
[INFO] datax-transformer .................................. SUCCESS [  2.832 s]
[INFO] datax-core ......................................... SUCCESS [  6.477 s]
[INFO] plugin-unstructured-storage-util ................... SUCCESS [  2.426 s]
[INFO] hdfswriter ......................................... SUCCESS [ 22.557 s]
[INFO] plugin-rdbms-util .................................. SUCCESS [  2.616 s]
[INFO] hbase20xsqlreader .................................. SUCCESS [  2.667 s]
[INFO] hbase20xsqlwriter .................................. SUCCESS [  2.050 s]
[INFO] kuduwriter ......................................... SUCCESS [  2.231 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:27 min
[INFO] Finished at: 2021-04-13T20:35:42+08:00
[INFO] ------------------------------------------------------------------------

替换掉服务器上的插件文件

4.重新调用

[hadoop@FineReportAppServer bin]$ python datax.py  ~/DataXJsonConfig/DRP/aaa.json
'''
'''省略
'''
2021-04-13 20:36:58.575 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2021-04-13 20:36:58.682 [job-0] INFO  JobContainer - 
	 [total cpu info] => 
		averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
		-1.00%                         | -1.00%                         | -1.00%
                        

	 [total gc info] => 
		 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
		 PS MarkSweep         | 1                  | 1                  | 1                  | 0.043s             | 0.043s             | 0.043s             
		 PS Scavenge          | 1                  | 1                  | 1                  | 0.018s             | 0.018s             | 0.018s             

2021-04-13 20:36:58.683 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-04-13 20:36:58.684 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 14 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-04-13 20:36:58.687 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-04-13 20:36:46
任务结束时刻                    : 2021-04-13 20:36:58
任务总计耗时                    :                 12s
任务平均流量                    :                1B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

读取hive:

hive (ods)> select * from aaa;
OK
aaa.plucode
12345
1234545
Time taken: 0.077 seconds, Fetched: 2 row(s)

参考:https://blog.csdn.net/weixin_43320617/article/details/109387903
https://blog.csdn.net/u013868665/article/details/79971419?utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromMachineLearnPai2~default-1.control&dist_request_id=1331647.793.16183179675246397&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromMachineLearnPai2~default-1.control

标签:INFO,case,插件,SUCCESS,HDFSWRITER,break,recordList,add,DataX
来源: https://www.cnblogs.com/xcKris/p/16277834.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有