标签:flink 写入 rowdata elasticsearch sink table scheme es
接上一篇,实现flink对elasicsearch的source/table
flink elasticsearch sink 的 table写,默认会写些'冗余'列进es
es table写,官方支持自定义主键列,和主键列的连接方式(-)
以这几个列连接,计算id ,做为es的_id 写入es
同时也像logstash/nifi 那样支持动态index,配置索引列,写入对的应索引
写入没有问题
问题是这些参于计算的列,也都会一并写入_source,虽然可以通过配置es的mapping,减少这些列的开销,但毕竟看着碍事
有没有办法不写入这些列?
官方没有,就自已想办法定制
结全flink的应用经验,并结合上篇es source的适配经验,很简单就实现了
1 官方的sink table是dynamic table 生成的是rowdata
2 sink 结合scheme 实现对rowdata的解析,转为json字符串,bulk写入es
问题就出现在这一步,scheme有列信息 rowdata 有列数据
把要转为字符串的rowdata及scheme 都去掉相应的列,问题就应该解决了
添加自定义参数名,ignore-fields 构造时加载
遍历旧scheme,过滤掉ignore-fields 列,生成新的scheme,以scheme做序列化
报错
因为rowdata和scheme不匹配,rowdata内的列也需要去掉
rowdata不支持k/v访问,但字段顺序和scheme一致,通过scheme算出ignore-fields在rowdata内的index
过滤掉rowdata的相关列,重新生成rowdata即可
如些问题解决,测试通过,写入es不再会有'冗余'字段
但该方法只是功能满足,实际性能有损失
scheme的加载是一次性的,没有影响
但对每一行数据rowdata,都要过滤字段,生成新的rowdata,开销相对较大,有一定影响
最完美的办法是在序列化的时候,也就是flink的原码基础上做,序列化时对rowdata不必要的字段做过滤
而不是构造一个过滤字段后的rowdata,由flink序列化
实现一个同名类,扔到flink/lib下,和官方类,先后顺序不清楚
必要时自已改动序列化部分,编译flink,部署
https://github.com/cclient/flink-connector-elasticsearch-sink
标签:flink,写入,rowdata,elasticsearch,sink,table,scheme,es 来源: https://www.cnblogs.com/zihunqingxin/p/14957208.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。