ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink读写Redis(一)-写入Redis

2020-10-26 21:35:20  阅读:679  来源: 互联网

标签:flink Redis 读写 Flink redis streaming import apache org


项目pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jike.flink</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <flink.version>1.10.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- flink 11中需要手动添加
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>
        -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <scope>system</scope>
            <systemPath>${basedir}/lib/flink-connector-redis_2.11-1.1.5.jar</systemPath>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
            <scope>compile</scope>
        </dependency>

    </dependencies>

</project>

实现flink写入redis

实现wordcount功能,并将结果实时写入redis,这里使用了第三方依赖flink-connector-redis_2.11,该依赖提供了RedisSink可以直接使用,具体代码如下:

代码

首先定义数据源处理实现类LineSplitter,该类将一行数据分词,输出<单词,1>元祖

package com.jike.flink.examples.redis;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        String[] tokens = s.toLowerCase().split("\\W+");
        for(String token : tokens){
            if(token.length() > 0){
                collector.collect(new Tuple2<String,Integer>(token,1));
            }
        }
    }
}

然后定义数据写入Redis的配置类,这里面将统计后的所有信息词频写入一个哈希表,哈希表的key为"flink",作为测试使用,哈希表中每个元素key为单词,value为词频

package com.jike.flink.examples.redis;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkRedisMapper implements RedisMapper<Tuple2<String,Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        //hset
        return new RedisCommandDescription(RedisCommand.HSET,"flink");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        return stringIntegerTuple2.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        return stringIntegerTuple2.f1.toString();
    }
}

最后编写主程序类,该类中使用了socketTextStream数据源,通过前面定义LineSplitter完成解析,然后根据单词进行分组统计,最后写入redis


package com.jike.flink.examples.redis;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;

public class Sink2Redis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("实际IP",12345);
        DataStream<Tuple2<String,Integer>> counts = dataStreamSource.flatMap(new LineSplitter()).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        }).sum(1);
         //控制台打印
        counts.print().setParallelism(1);
        //定义redis服务器信息
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("redis服务器ip").setPort(redis服务端口).setPassword("redis服务密码").build();
        counts.addSink(new RedisSink<>(conf,new SinkRedisMapper()));
        executionEnvironment.execute();
    }
}

运行效果

通过nc -l 12345,命令模拟数据源,并输入一些数据

IDEA中查看打印记录

查看redis

可以发现数据已写入redis

总结

flink-connector-redis_2.11中提供了RedisSink类,该类实现了RichSinkFunction,可以直接使用,如果有特殊需求,可以自定义Sink类,继承RichSinkFunction,实现特殊处理。flink-connector-redis_2.11的源码比较简洁,下一篇打算分析学习下。

标签:flink,Redis,读写,Flink,redis,streaming,import,apache,org
来源: https://www.cnblogs.com/darange/p/13881460.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有