ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kettle实战-巧用合并记录组件完成业务对账

2020-11-25 12:32:44  阅读:291  来源: 互联网

标签:11 10 23 kettle 42 对账 2020 日志 巧用


kettle实战-巧用合并记录组件轻松完成业务对账

应用场景:

对于互联网公司而言,无论是支付还是其他产品的交易操作都难免会涉及到对账的环节,这是保证公司与支付平台或者公司与渠道之间交易对等的关键。kettle作为开源的ETL工具为我们提供了强大的组件支持,在实际的开发中数据处理应用到的地方也是非常的广泛,之前在实际的开发中碰巧遇到了这么一个对账的场景,便借此机会运用kettle的合并记录组件完成了这一功能的开发,相当的方便快捷,极大地降低了开发的工作量。

技术

kettle:pdi-ce-8.3.0.0-371

组件简介-合并记录

1.功能介绍
该步骤用于将两个不同来源的数据合并,这两个来源的数据分别为旧数据和新数据,该步骤将旧数据和新数据按照指定的关键字匹配、比较、合并。
2.设置参数
旧数据来源:旧数据来源的步骤
新数据来源:新数据来源的步骤
关键字段:用于定位两个数据源中的同一条记录
比较字段:对于两个数据源中的同一条记录中,指定需要比较的字段
3.运行结果分析
合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果:

第一列第二列
identical旧数据和新数据一样
changed数据发生了变化
new新数据中有而旧数据中没有的记录
deleted旧数据中有而新数据中没有的记录

4.关键点
旧数据和新数据需要事先按照关键字段排序
旧数据和新数据要有相同的字段名称

业务场景

每天下午五点从SFTP上获取渠道交易数据(交易时间:昨日下午三点至今日下午三点,交易类型:转入、转出、退保等),文件名为 交易类型_当前日期.zip(例如:5_20201124.zip),加压缩后包含:交易类型_当前日期.csv(例如:5_20201124.csv)和交易类型_当前日期.csv.md5(例如:5_20201124.csv.md5)文件,解析时对csv文件+key进行MD5获取签名进行核对,然后进行业务对账。

简要流程

SFTP下载渠道侧对账文件——合并记录——结果汇总(落库)——异常数据报表(告警)

整体流程

在这里插入图片描述

流程拆解

第一步:设置变量
设置文件地址,SFTP配置信息,当然常用的配置也可选择配到kettle.properties文件中
在这里插入图片描述

第二步:设置系统时间变量
该步骤主要将系统时间格式化为自己需要使用的格式,例如:文件目录(2020/11/24,或者业务开始时间:2020-11-23 15:00:00,供sql查询使用)
在这里插入图片描述

第三步:下载SFTP上的文件到本地
在这里插入图片描述

第四步:解压缩文件
在这里插入图片描述

第五步:MD5验签(验签代码见下方)
在这里插入图片描述
在这里插入图片描述
第六步:留存渠道交易数据
在这里插入图片描述

第七步:核对数据(关键步骤)
该步骤主要利用合并记录组件来进行数据的核对,需要注意:sql查询时字段的顺序、命名、个数保持、格式保持一致,并且按照相同的排序规则进行排序,然后设置关键字段进行对比将对比结果输出,将验证结果进行处理入库
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

第七步:汇总对账结果
该步骤可以将当前日期对账的结果进行汇总形成报表,或者是直接进行告警(比如企业微信机器人告警等)通知运维进行相关问题的核查及时处理。示例中打印日志的地方往往会被设置为异常告警便于及时发现。
在这里插入图片描述

运行结果分析

模拟数据

渠道端交易数据:
在这里插入图片描述

我方交易数据:
在这里插入图片描述

运行日志:

2020/11/23 23:42:09 - verify - 开始项[检查文件存放目录]
2020/11/23 23:42:09 - verify - 开始项[SFTP 下载]
2020/11/23 23:42:09 - verify - 开始项[解压缩文件]
2020/11/23 23:42:09 - verify - 开始项[验签]
2020/11/23 23:42:09 - 验签 - Using run configuration [Pentaho local]
2020/11/23 23:42:09 - 验签 - Using legacy execution engine
2020/11/23 23:42:09 - Md5 - 为了转换解除补丁开始  [Md5]
2020/11/23 23:42:09 - 获取全局变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:09 - Java 代码.0 - >>>>>MD5:3a9715f0fa2081119665d82ccf8c72b7
2020/11/23 23:42:10 - Java 代码.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - 结束文件输出Y/N .0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[判断验签是否正确]
2020/11/23 23:42:10 - verify - 开始项[同步交易数据(入库)]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using legacy execution engine
2020/11/23 23:42:10 - synchRefundData - 为了转换解除补丁开始  [synchRefundData]
2020/11/23 23:42:10 - 读取csv文件.0 - Header row skipped in file 'D:/kettle/verify/download/2020/11/23/5_20201123.csv'
2020/11/23 23:42:10 - 读取csv文件.0 - 完成处理 (I=3, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 获取变量.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入 / 更新.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[核对数据一致性]
2020/11/23 23:42:10 - 核对数据一致性 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 核对数据一致性 - Using legacy execution engine
2020/11/23 23:42:10 - verify - 为了转换解除补丁开始  [verify]
2020/11/23 23:42:10 - 京东退保数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 公司交易数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 处理单位-渠道.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 排序-渠道侧数据.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 处理单位-公司.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 排序-公司数据.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 京东退保数据.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385734
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742984
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = changed
2020/11/23 23:42:10 - 写日志.0 - flag = 1
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 公司交易数据.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2020/11/23 23:42:10 - 正向对比.0 - 完成处理 (I=0, O=0, R=5, W=3, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385736
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742985
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = new
2020/11/23 23:42:10 - 写日志.0 - flag = 3
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 字段选择.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 对账状态映射.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 过滤对账一致记录.0 - 完成处理 (I=0, O=0, R=3, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入对账明细.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[汇总对账结果]
2020/11/23 23:42:10 - 汇总对账结果 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 汇总对账结果 - Using legacy execution engine
2020/11/23 23:42:10 - gatherVerifyData - 为了转换解除补丁开始  [gatherVerifyData]
2020/11/23 23:42:10 - 表输入.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - result = 不一致的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - result = 保险公司多出的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================

代码解析

MD5验签代码片.

import com.aliyun.openservices.shade.org.apache.commons.codec.digest.DigestUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.compress.utils.IOUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Calendar;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
        //获取到上一个步骤的输入行
        Object[] r = getRow();
        if (r == null) {
        setOutputDone();
        return false;
        }
        r = createOutputRow(r, data.outputRowMeta.size());
        //读取出參数变量值
        String key = getVariable("key", "");
        String fileDir = getVariable("fileDir", "");
        String todayF = get(Fields.In, "todayF").getString(r);
        String today = get(Fields.In, "today").getString(r);
        String tradeType = getVariable("tradeType", "");
        //获取MD5摘要
        String Md5Dir = fileDir  + "/" +todayF + "/" + tradeType +"_"+ today + ".csv.md5";
        //拼接文件地址
        String csvFileDir = fileDir  + "/" + todayF  + "/"+ tradeType +"_"+ today + ".csv";
        File file = new File(csvFileDir);
        FileInputStream fis = null;
        File channelfile = new File(Md5Dir);
        FileInputStream channelfis = null;
        String md5code = null;
        String channelmd5code = null;
        boolean flag = true;
        try {
        fis = new FileInputStream(file);
        byte[] csvFileNameBytes  = IOUtils.toByteArray(fis);
        md5code = DigestUtils.md5Hex(ArrayUtils.addAll(key.getBytes(), csvFileNameBytes));
        //logBasic(">>>>>MD5:"+md5code);
        //读取渠道验签
        channelfis = new FileInputStream(channelfile);
        byte[]bytes=new byte[1024];
        int len =-1;
        while ((len=channelfis.read(bytes))!=-1){
        channelmd5code=new String(bytes);
        }
        if(channelmd5code!=null){
        channelmd5code=channelmd5code.trim();
        }
        if(!md5code.equals(channelmd5code)){
        flag=false;
        }
        } catch (Exception e) {
        e.printStackTrace();
        } finally {
        try {
        fis.close();
        } catch (IOException e) {
        e.printStackTrace();
        }
        try {
        channelfis.close();
        } catch (IOException e) {
        e.printStackTrace();
        }
        }

        //把计算好的值放入到输出记录中
        get(Fields.Out, "flag").setValue(r, flag);
        //输出到下一个节点做处理
        putRow(data.outputRowMeta, r);
        return true;
        }

demo

demo示例

标签:11,10,23,kettle,42,对账,2020,日志,巧用
来源: https://blog.csdn.net/weixin_45598170/article/details/109952983

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有