ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

一次记录 flink job 消费kafka 迁移pulsar踩坑过程

2021-12-31 18:33:35  阅读:271  来源: 互联网

标签:flink kafka topic job offset pulsar log4j


背景简述

业务上,原有的kafka集群迁移pulsar 后续会下线kafak集群,原有的一些消费kafka topic 的任务和进程需要迁移至pulsar 并下线旧的消费kafka任务。目前在迁移期间,上报的消息会双写到kafka pulsar,消费组的offset二者是独立的。

待迁移的flink job 之前flink 版本是 1.9.1(scala 2.12) 消费kafak

FlinkKafkaConsumer<String> myConsumer = KafkaConsumerFactory.get(topic, group);
env.addSource(myConsumer )...

这次迁移 job的版本集群环境需要 升级为 1.13(scala 2.12) 消费pulsar

/*job main 方法 消费topic 迁移到pulsar*/
FlinkPulsarSource<String> myConsumer = PulsarSourceFactory.getSource(topic, group);
env.addSource(myConsumer)...

/**
 * @author: xiejiahao
 * Date: 2021/12/22 11:28
 * Description:
 */
public class PulsarSourceFactory {
    public static final String serviceUrl =
            "pulsar://pulsar-***:6651";
    public static final String adminUrl =
            "http://pulsar-*****:8441";

    public static FlinkPulsarSource<String> getSource(String topic, String group) {

        FlinkPulsarSource<String> flinkPulsarSource =
                new FlinkPulsarSource<>(
                        serviceUrl,
                        adminUrl,
                        PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()),
                        buildProperties(topic, group)).setStartFromSubscription(group);

        return flinkPulsarSource;
    }

    private static Properties buildProperties(String topic, String group) {
        Properties props = new Properties();
        props.put("topic", "persistent://****/" + topic);
        props.put("pulsar.reader.subscriptionRolePrefix", group);
        props.setProperty("auth-plugin-classname", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
        props.setProperty("auth-params", "token:" + AppConfig.get("pulsar_topic_token"));

        // earliest
        // 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // latest
        // 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // none
        // topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        String offset = AppConfig.get("kafka.offset", "");
        if (!StringUtils.isBlank(offset)) {
            props.put("auto.offset.reset", offset);
        }
        // props.put("client.id", "h_c_" + ThreadLocalRandom.current().nextLong());
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        return props;
    }
}

开发过程需要注意的点(踩坑记录)

flink支持消费pulsar 必备的依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-pulsar_2.12</artifactId>
   <version>1.13.0</version>
   <scope>${scope.version}</scope>
</dependency>
<dependency>
   <groupId>org.apache.pulsar</groupId>
   <artifactId>pulsar-client-all</artifactId>
   <version>2.8.0</version>
   <scope>${scope.version}</scope>
</dependency>

pom文件里面依赖配置的 ${scope.version} 本地测试时设置为compile 打包提交到集群时 为避免和集群依赖冲突 改为provided 然后再打包上传

上传jar包时,应确认jar包中不存在与flink, pulsar相关的依赖包,避免运行作业时与平台自带的依赖包发生冲突。

当在本地进行测试时,因为job之前的版本是1.9.1 本地测试时需要/org/apache/flink/flink-core/1.13.0/flink-core-1.13.0.jar下载依赖包到本地并进行替换。

依赖冲突解决

这里 我的flink job中需要使用到consul 用来服务发现 codis集群,但是consul内置的okhttp okio版本都要高于flink集群上的版本,因为版本冲突 运行Job时直接报错 NoSuchMethodError

这类问题的解决办法是 使用maven-shade-plugin 对版本冲突的报的类名直接重命名 然后才重新编译

<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-shade-plugin</artifactId>
   <version>3.1.1</version>
   <executions>
      <execution>
         <phase>package</phase>
         <goals>
            <!--使用shade 解决本地和集群包版本 依赖冲突-->
            <goal>shade</goal>
         </goals>
         <configuration>
            <transformers>
               <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.test.App</mainClass>
               </transformer>
            </transformers>

            <relocations>            
               <relocation>
                  <pattern>okio</pattern>
                  <shadedPattern>okio.ad.shade</shadedPattern>
               </relocation>

               <relocation>
                  <pattern>okhttp3</pattern>
                  <shadedPattern>okhttp3.ad.shade</shadedPattern>
               </relocation>
            </relocations>
         </configuration>
      </execution>
   </executions>
</plugin>

在重新打包后, 通过jd-gui 查看jar 可发现pom 依赖的 okio okhttp3 已经重命名,这样可解决job 自身依赖的jar 与平台不兼容问题
在这里插入图片描述

日志框架冲突导致job 运行时无业务日志输出

我的flink job自身使用的日志框架是 common-log, 但是flink集群默认使用的日志框架是log4j 二者发生冲突导致 job 自身的业务日志无法输出, 这里改为slf4j + log4j2

具体操作是

<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-api</artifactId>
   <version>1.7.25</version>
   <scope>${scope.version}</scope>
</dependency>

<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.21</version>
   <scope>${scope.version}</scope>
</dependency>

<dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
   <scope>${scope.version}</scope>
</dependency>

并配置log4j.properties

log4j.rootLogger=info,console  
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

这里具体的配置 参考的是:https://www.shuzhiduo.com/A/gAJGqkEodZ/

值得注意的是 log4j 已经报出有安全漏洞问题,当你的项目pom文件中 配置log4j依赖 打包到平台一定要确定scop为provided 以便使用集群上统一的高版本的log4j。

timewindow算子过期不能使用导致sink输出没有数据

这个坑 应该是本次迁移 折腾最久的一个坑了,因为之前日志框架冲突的问题没有解决 sink输出没有数据 也无法定位到是哪一块出现问题。

job 自身逻辑中 对数据的唯一id hashcode keyBy分区之后 timeWindow(60s) 将批量的数据 写入db中进行存储,但是日志观察window函数 一直没有正常日志输出,sink输出的db 也一直没有数据。

这里对flink 环境设置

if (arg.contains("TEST")){
    env.disableOperatorChaining();
}

重新提交 观察数据在各个算子建的流动情况时发现 window算子这里只有记录接收 无记录发送到下游
在这里插入图片描述

初步怀疑是历史代码的timewindow算子 在flink 1.13的版本里面有bug 导致window执行出现异常

解决办法:

这里测试 将keyBy+window 简化成一个flatmap 算子有正常输入输出了
商业技术中台 > likeeboost实时下发数据消费kafka迁移pulsar总结 > image2021-12-31_10-33-9.png
商业技术中台 > likeeboost实时下发数据消费kafka迁移pulsar总结 > image2021-12-31_10-33-40.png

考虑到业务场景 最好是批量写入tidb 减少tidb的连接数, 这里后面改用了countwindow来做window函数 默认的size是100

int tidbCountWindowsSize = AppConfig.get("main.tidb.countWindowsSize", 100);
int tidbCountTriggerTimeout = AppConfig.get("main.tidb.CountTriggerTimeout", 1000);
LogCountWithTimeoutTrigger tidbCountWithTimeoutTrigger = LogCountWithTimeoutTrigger.of(tidbCountWindowsSize,
        tidbCountTriggerTimeout);

DataStream<DBRecord> dbList = groupkeyStream
        .countWindow(tidbCountWindowsSize).trigger(tidbCountWithTimeoutTrigger)
        .process(new OrderCountWindowFunction(tidbCountWindowsSize)).name("order_countWindow_records");

部署后 数据在各个算子的流动情况一切正常
在这里插入图片描述
对比测试环境tidb和线上消费kafka sink输出到tidb 的数据情况,因为测试环境使用的countwindow(默认是100) 数据从redis获取去重的数据更新到tidb 更快,性能要优于之前逻辑。

标签:flink,kafka,topic,job,offset,pulsar,log4j
来源: https://blog.csdn.net/weixin_38073885/article/details/122260706

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有