标签:Flume 实战 拦截器 ts public flume Override import event
背景:公司属于教育公司,自研一款线上教育app。由于疫情,导致公司业务扩大,数据量剧增。于是公司打算自研一套数据中台。本人有幸负责公司数据采集这一块项目。
解决的问题:根据埋点数据会产生一条json日志写到服务器指定的目录下。因此我需要采集到数据传入kafka之中,所以Flume组件成了必选项。本次主要介绍flume基于时间戳的拦截器
package com.tuoqing.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 获取事件的值并转换成UTF_8的格式
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
// 将获取的字符串转换成对象
JSONObject jsonObject = JSON.parseObject(log);
// 获取对象中key值为"ts"的字段
if (jsonObject.containsKey("ts")){
String ts = jsonObject.getString("ts");
event.getHeaders().put("timestamp",ts);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
标签:Flume,实战,拦截器,ts,public,flume,Override,import,event 来源: https://blog.csdn.net/morsunlight/article/details/115707344
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。