ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink初探wordCout

2019-07-14 14:50:52  阅读:170  来源: 互联网

标签:WordWithCount Flink word wordCout flink 初探 apache org public


知识点

Flink介绍
    1、无界数据-->数据不断产生
    2、有界数据-->最终不再改变的数据
    3、有界数据集是无界数据集的一个特例
    4、有界数据集在flink内部是以一种终态数据集进行处理的
    5、在flink中,有界和无界的差别非常小
    6、使用同一套流计算引擎上的API操作两种数据类型
    
流计算:
        数据不断产生,就一直处于计算状态
批处理:
        完成一定时间段的计算任务后,就释放资源
        
Flink特性:
    结果精准,即使是无序数据或者延迟到达的数据
    有状态以及容错。
        有状态,表示一直保存计算结果,以便往后传递计算值
        
    实现精准一次计算的应用状态
    大规模计算,几千台节点上运算,高吞吐和低延迟的特点
    Flink通过检查点机制实现精准一次的计算保证,在故障时可以体现出来
    flink支持流计算以及窗口化操作
    flink支持灵活的基础时间的窗口计算
    flink容错是轻量级的,保证零数据丢失。

1、下载并安装

官网安装步骤:https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/local_setup.html

2、本人安装版本

1、flink-1.7.2-bin-hadoop24-scala_2.11.tgz
2、tar -xzvf flink-1.7.2-bin-hadoop24-scala_2.11.tgz
3、mv flink-1.7.2 /usr/local/flink

3、运行flink

./bin/start-cluster.sh 

4、web ui查看flink界面

http://ip:8081

5、查看日志信息

查看flink启动日志信息 log/flink-root-standalonesession-0-localhost.localdomain.log
查看job任务启动信息    log/flink-root-taskexecutor-0-localhost.localdomain.log
查看job任务输出信息 tail -100f flink-root-taskexecutor-0-localhost.localdomain.out

6、编写wordcout程序,这个可以查看官网

  a)pom.xml , 注意将<scope>provided</scope>注释,否则找不到dataset类

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ywj</groupId>
    <artifactId>flink.test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.7.2</version>
            <!--<scope>provided</scope>-->
        </dependency>

    </dependencies>
</project>
View Code

  b)SocketWindowWordCount.java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // 定义连接端口
        final int port=8888;
        // 得到执行环境对象
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 连接套接字socket之后,读取输入数据text
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // 解析数据:分组,窗口,聚合,计数
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        //value.split("\\s") 将value按照空格或者制表符等进行切割
                        for (String word : value.split("\\s")) {//value为输入的一行数据
                            out.collect(new WordWithCount(word, 1L)); //封装成WordWithCount对象
                        }
                    }
                })
                .keyBy("word") //根据key进行分组
                .timeWindow(Time.seconds(5),Time.seconds(1))//Time.seconds(5), Time.seconds(1)
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count); //计数
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

  c)Exception in thread "main" java.lang.VerifyError: Uninitialized object exists on backward branch 96,这种错误

请升级JDK版本,本人使用的是jdk8-211版本

7、打包运行

a) mvn package    打成jar包,放入centos中
b)查看flinks时候运行 netstat -ano | grep 8081
c)  nc -l 8888 -v
d) ./bin/flink run -c SocketWindowWordCount /home/ywj/flink.test-1.0-SNAPSHOT.jar 
e) 查看输出信息 tail -100f flink-root-taskexecutor-0-localhost.localdomain.out 

############如果在windows进行测试,可以在win10中使用netcat#######

netcat使用

netcat测试
    linux (centos)
        1、nc 192.168.227.128 5000 客户端
        2、 nc -l 5000 -v  服务端
        
    windows(win10)
        1、nc -L -p 8888 服务端
        2、nc localhost 8888 客户端

windows调试wordcout

windows10测试:
        7、cmd启动执行  nc -L -p 8888
        8、运行flink代码

 

标签:WordWithCount,Flink,word,wordCout,flink,初探,apache,org,public
来源: https://www.cnblogs.com/ywjfx/p/11184228.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有