ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

DataStream之Sink简介及RichSinkFunction

2021-08-05 15:30:10  阅读:493  来源: 互联网

标签:DataStream flink String RichSinkFunction api Sink org apache import


来源:https://blog.csdn.net/zhuzuwei/article/details/107142494

1. 安装nc 

yum -y install nmap-ncat
2. 启动(8888是端口号)

nc -lk 8888

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class AddSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lines = env.socketTextStream("10.66.31.133", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> words = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(",");
                for (int i = 0; i < words.length; i++) {
                    collector.collect(Tuple2.of(words[i], 1));
                }
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = words.keyBy(0).sum(1);

        summed.print();

        summed.writeAsText("C:\\Users\\admin\\Desktop\\flinkTest\\sinkout1.txt", FileSystem.WriteMode.OVERWRITE);

        SingleOutputStreamOperator<Tuple3<String, String, Integer>> words2 = lines.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                String[] words = s.split(",");
                for (int i = 0; i < words.length; i++) {
                    collector.collect(Tuple3.of("wordscount", words[i], 1));
                }
            }
        });

        SingleOutputStreamOperator<Tuple3<String, String, Integer>> summed2 = words2.keyBy(1).sum(2);

        String configPath = "C:\\Users\\admin\\Desktop\\flinkTest\\config.txt";
        ParameterTool parameters = ParameterTool.fromPropertiesFile(configPath);
        //设置全局参数
        env.getConfig().setGlobalJobParameters(parameters);

        summed2.addSink(new MyRedisSinkFunction());

        env.execute("AddSinkTest");
    }

}

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
 
public class MyRedisSinkFunction extends RichSinkFunction<Tuple3<String, String, Integer>>{
    private transient Jedis jedis;
 
    @Override
    public void open(Configuration config) {
        ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String host = parameters.getRequired("redis.host");
        String password = parameters.get("redis.password", "");
        Integer port = parameters.getInt("redis.port", 6379);
        Integer timeout = parameters.getInt("redis.timeout", 5000);
        Integer db = parameters.getInt("redis.db", 0);
        jedis = new Jedis(host, port, timeout);
        jedis.auth(password);
        jedis.select(db);
    }
 
    @Override
    public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception {
        if (!jedis.isConnected()) {
            jedis.connect();
        }
        //保存
        jedis.hset(value.f0, value.f1, String.valueOf(value.f2));
    }
 
    @Override
    public void close() throws Exception {
        jedis.close();
    }
}

标签:DataStream,flink,String,RichSinkFunction,api,Sink,org,apache,import
来源: https://blog.csdn.net/sasa527/article/details/119418955

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有