ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka数据定时导入hive便于后续做数据清洗

2021-04-15 10:06:10  阅读:114  来源: 互联网

标签:camus jar hadoop hive kafka 导入 etl


文章目录

    • 问题背景
    • 解决过程
    • 注意事项


问题背景

kafka数据定时导入到hive,后续做数据清洗:
flume,confulent都需要单独部署服务,比较繁琐。调查其他可选方案,参考以下文章:参考资料
综合比较,camus 简单,比较方便接入。主要分两步:
1、采用mapreduce过程处理数据从kafka导入hadoop
2、hadoop数据接入hive管理。

解决过程

1、下载源码,本地构建jar包。
参考文章
camus源码
2、查看camus.properties配置文件,支持的功能选项
期间需要自定义input,output encoder,
需要配置Reader,Writer类,具体参考源码实现。
3、修改camus.properties配置项,最终结果如下:

camus.job.name=Camus-Job-Test
etl.destination.path=/tmp/escheduler/root/resources/topics
etl.execution.base.path=/tmp/escheduler/root/resources/camus/exec
etl.execution.history.path=/tmp/escheduler/root/resources/camus/exec/history
# 新增的自定义decoder
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder
# 修改写入hadoop的writer
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
kafka.client.name=camus
# broker
kafka.brokers=....
# topic
kafka.whitelist.topics=topic_exec_servicecheck_prod_calConfigId_177
log4j.configuration=false
# 禁用压缩,deflate,snappy
mapred.output.compress=false
etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=Asia/Shanghai

4、上传jar,properties文件,执行如下命令:实现kafka数据到hadoop的功能:

cd /home/app/transform/libs/
hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties

数据导入到hadoop,
5、数据从hadoop到hive,执行如下脚本:

date_string=$(date '+%Y/%m/%d/%H') 
partion=$(date '+%Y-%m-%d_%H')
topic='topic_exec_servicecheck_prod_calConfigId_177'
table_name='dwd.test_exec_servicecheck'
filePath="/tmp/escheduler/root/resources/topics/$topic/hourly/"$date_string"/"
hive<<EOF
create table if not exists $table_name(
   date TIMESTAMP,
   node STRING,
   status STRING
)
PARTITIONED BY(dt STRING)
row format delimited 
fields terminated by '|'  
STORED AS TEXTFILE;
load data inpath '$filePath' into table $table_name partition (dt='$partion');
EOF

6、配置定时调度,按小时执行。

注意事项

附自定义decoder

public class StringMessageDecoder  extends MessageDecoder<Message, String> {
	private static final org.apache.log4j.Logger log = Logger.getLogger(JsonStringMessageDecoder.class);

	@Override
	public CamusWrapper<String> decode(Message message) {
		//log.info(message.getTopic());
		return new CamusWrapper<String>(new String(message.getPayload()));
	}
}

hive input/output 支持自定义数据格式,这个是很有意义的,通常来说文本文件,分隔符分割一行,纯文本解析,最简单,但是可读性,可维护性差。
支持json格式数据写入,json处理相关jar文件 放到${HIVE_HOME}/lib目录。

标签:camus,jar,hadoop,hive,kafka,导入,etl
来源: https://blog.51cto.com/u_3075318/2707320

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有