ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink基础_day01

2022-02-17 21:32:24  阅读:153  来源: 互联网

标签:String day01 flink 基础 api import apache org


FlinK 1.12批流一体

flink 1.13.0 流处理应用更加简单高效

  • 第1代——Hadoop MapReduce

首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。

n 批处理

n Mapper、Reducer

  • 第2代——DAG框架(Tez) + MapReduce

由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。

n 批处理

n 1个Tez = MR(1) + MR(2) + ... + MR(n)

n 相比MR效率有所提升

  • 第3代——Spark

接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。

n 批处理、流处理、SQL高层API支持

n 自带DAG

内存迭代计算、性能较之前大幅提升

  • 第4代——Flink

随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQ- 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。

n 批处理、流处理、SQL高层API支持

n 自带DAG

n 流式计算性能更高、可靠性更高

应用场景

  • 事件驱动

  • 数据分析

  • 数据管道

架构

image-20220217115157409

  • JobManager处理器:

也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。

  • TaskManager处理器:

也称之为Worker,用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和data stream的交换,Flink运行时至少会存在一个worker处理器。

  • Slot 任务执行槽位:

物理概念,一个TM(TaskManager)内会划分出多个Slot,1个Slot内最多可以运行1个Task(Subtask)或一组由Task(Subtask)组成的任务链。

多个Slot之间会共享平分当前TM的内存空间

Slot是对一个TM的资源进行固定分配的工具,每个Slot在TM启动后,可以获得固定的资源

比如1个TM是一个JVM进程,如果有6个Slot,那么这6个Slot平分这一个JVM进程的资源

但是因为在同一个进程内,所以线程之间共享TCP连接、内存数据等,效率更高(Slot之间交流方便)

  • Task:

任务,每一个Flink的Job会根据情况(并行度、算子类型)将一个整体的Job划分为多个Task

  • Subtask:

子任务,一个Task可以由多个Subtask组成,一个Task有多少个Subtask取决于这个Task的并行度

也就是,每一个Subtask就是当前Task任务并行的一个线程

如,当前Task并行度为8,那么这个Task会有8个Subtask(8个线程并行执行这个Task)

  • 并行度:

并行度就是一个Task可以分成多少个Subtask并行执行的一个参数

这个参数是动态的,可以在任务执行前进行分配,而非Slot分配,TM启动就固定了

一个Task可以获得的最大并行度取决于整个Flink环境的可用Slot数量,也就是如果有8个Slot,那么最大并行度也就是8,设置的再大也没有意义

如下图:

- 一个Job分为了3个Task来运行,分别是TaskA TaskB TaskC

- 其中TaskA设置为了6个并行度,也就是TaskA可以有6个Subtask,如图可见,TaskA的6个Subtask各自在一个Slot内执行

- 其中在Slot的时候说过,Slot可以运行由Task(或Subtask)组成的任务链,如图可见,最左边的Slot运行了TaskA TaskB TaskC 3个Task各自的1个Subtask组成的一个Subtask执行链

image-20220217123204977

Slot是物理的概念,是静态的概念,一旦flink启动以后,tm就制定了slot数量,不能改变

parallelism是动态的概念,可以设置并行度的优先级,可以设置算子级别的或者应用程序全局的并行度、递交作业时设置并行度、使用flink部署环境默认配置文件中指定的并行度

优先级从前往后,越来越低

并行度的设置

并行度是一个动态的概念,可以在多个地方设置并行度:

- 配置文件默认并行度:conf/flink-conf.yaml的parallelism.default

- 启动Flink任务,动态提交参数:比如:bin/flink run -p 3

- 在代码中设置全局并行度:env.setParallelism(3); 

- 针对每个算子进行单独设置:sum(1).setParallelism(3)

优先级:算子 > 代码全局 > 命令行参数 > 配置文件

fink 编程模型

image-20220217124152126

  • 最顶层:SQL/Table API 提供了操作关系表、执行SQL语句分析的API库,供我们方便的开发SQL相关程序

  • 中层:流和批处理API层,提供了一系列流和批处理的API和算子供我们对数据进行处理和分析

  • 最底层:运行时层,提供了对Flink底层关键技术的操纵,如对Event、state、time、window等进行精细化控制的操作API

Libraries支持

  • 支持机器学习(FlinkML)

  • 支持图分析(Gelly)

  • 支持关系数据处理(Table)

  • 支持复杂事件处理(CEP) 风控领域用得特别多

构建工程

  • maven

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.itcast</groupId>
        <artifactId>itcast_flinkbase51</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
        <repositories>
    <!--        <repository>-->
    <!--            <id>aliyun</id>-->
    <!--            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>-->
    <!--        </repository>-->
            <repository>
                <id>apache</id>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
            </repository>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <properties>
            <encoding>UTF-8</encoding>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <java.version>1.8</java.version>
            <scala.version>2.11</scala.version>
            <flink.version>1.13.1</flink.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
        <dependencies>
            <!--依赖Scala语言-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.11</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- web ui的依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- Apache Flink 的依赖 -->
            <!-- 这些依赖项,不应该打包到JAR文件中. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 用于通过自定义功能,格式等扩展表生态系统的通用模块-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- blink执行计划,1.11+默认的-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>-->
    
            <!-- flink连接器-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-connector-kafka_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-filesystem_2.12</artifactId>
               <version>${flink.version}</version>
           </dependency>-->
            <!--<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>-->
            <!--<dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-parquet_2.12</artifactId>
                  <version>${flink.version}</version>
             </dependency>-->
            <!--<dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.9.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-avro</artifactId>
                <version>1.10.0</version>
            </dependency>-->
    
    
            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>flink-runtime_2.11</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>flink-core</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>flink-java</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hive_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-metastore</artifactId>
                <version>2.1.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>hadoop-hdfs</artifactId>
                        <groupId>org.apache.hadoop</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>2.1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-shaded-hadoop-2-uber</artifactId>
                <version>2.7.5-10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
                <!--<version>8.0.20</version>-->
            </dependency>
    
            <!-- 高性能异步组件:Vertx-->
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-core</artifactId>
                <version>3.9.0</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-jdbc-client</artifactId>
                <version>3.9.0</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-redis-client</artifactId>
                <version>3.9.0</version>
            </dependency>
    
            <!-- 日志 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.44</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.2</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- 参考:https://blog.csdn.net/f641385712/article/details/84109098-->
            <!--<dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-collections4</artifactId>
                <version>4.4</version>
            </dependency>-->
            <!--<dependency>
                <groupId>org.apache.thrift</groupId>
                <artifactId>libfb303</artifactId>
                <version>0.9.3</version>
                <type>pom</type>
                <scope>provided</scope>
             </dependency>-->
            <!--<dependency>
               <groupId>com.google.guava</groupId>
               <artifactId>guava</artifactId>
               <version>28.2-jre</version>
           </dependency>-->
    
        </dependencies>
    
        <build>
            <sourceDirectory>src/main/java</sourceDirectory>
            <plugins>
                <!-- 编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                    </configuration>
                </plugin>
                <!-- 打包插件(会包含所有依赖) -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!--
                                            zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <!-- 设置jar包的入口类(可选) -->
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

单词统计

单词计数-(1.12之前的写法)流处理

package cn.itcast.day01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * todo flink1.12之前的写法
 * 编写Flink程序,接收socket的单词数据,并以空格进行单词拆分打印。
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)获取flink流处理的运行环境
         * 2)接入数据源,读取文件获取数据
         * 3)数据处理
         *   3.1:使用flatMap对单词进行拆分
         *   3.2:对拆分后的单词进行记一次数
         *   3.3:使用分组算子对key进行分组
         *   3.4:对分组后的key进行聚合操作
         * 4)构建sink,输出结果
         */
        //todo 1)获取flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接入数据源,读取文件获取数据
        DataStreamSource<String> lines = env.socketTextStream("node01", 9999);

        //todo 3)数据处理
        //  3.1:使用flatMap对单词进行拆分
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                //返回数据
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        //  3.2:对拆分后的单词进行记一次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //  3.3:使用分组算子对key进行分组
        //wordAndOne.keyBy(0);
//        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//            @Override
//            public String getKey(Tuple2<String, Integer> value) throws Exception {
//                return value.f0;
//            }
//        });
        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

        //  3.4:对分组后的key进行聚合操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);

        //todo 4)构建sink,输出结果
        sumed.print();

        //todo 5)启动运行
        env.execute();
    }
}

单词计数-(1.12之前的写法) 批处理

package cn.itcast.day01;

import groovy.lang.Tuple;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * todo flink1.12之前的写法
 * 编写Flink程序,读取文件中的字符串,并以空格进行单词拆分打印。
 */
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)获取flink批处理的运行环境
         * 2)接入数据源,读取文件获取数据
         * 3)数据处理
         *   3.1:使用flatMap对单词进行拆分
         *   3.2:对拆分后的单词进行记一次数
         *   3.3:使用分组算子对key进行分组
         *   3.4:对分组后的key进行聚合操作
         * 4)构建sink,输出结果
         */

        //todo 1)获取flink批处理的运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接入数据源,读取文件获取数据
        DataSource<String> lines = env.readTextFile("./data/input/wordcount.txt");

        //todo 3)数据处理
        //  3.1:使用flatMap对单词进行拆分
        FlatMapOperator<String, String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                //返回数据
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        //  3.2:对拆分后的单词进行记一次数
        MapOperator<String, Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //  3.3:使用分组算子对key进行分组
        UnsortedGrouping<Tuple2<String, Integer>> grouped = wordAndOne.groupBy(0);

        //  3.4:对分组后的key进行聚合操作
        AggregateOperator<Tuple2<String, Integer>> sumed  = grouped.sum(1);

        //todo 4)构建sink,输出结果
        sumed.print();
    }
}

单词统计(1.12之后)批流一体

package cn.itcast.day01;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * todo flink1.12以后的实现方案
 * todo 使用批流一体API编程模型实现单词计数
 *
 * 在flink中批是流的一个特例,也就意味着不管实现批还是流处理,肯定按照流的api实现批处理
 * DataStream
 * StreamExecutionEnvironment
 */
public class UnifyWordCount {
        public static void main(String[] args) throws Exception {
            /**
             * 实现步骤:
             * 1)获取flink流处理的运行环境
             * 2)接入数据源,读取文件获取数据
             * 3)数据处理
             *   3.1:使用flatMap对单词进行拆分
             *   3.2:对拆分后的单词进行记一次数
             *   3.3:使用分组算子对key进行分组
             *   3.4:对分组后的key进行聚合操作
             * 4)构建sink,输出结果
             */
            //todo 1)获取flink流处理的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            env.setRuntimeMode(RuntimeExecutionMode.BATCH);      //使用dataStream实现批处理
            //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);  //使用dataStream实现流处理(如果数据源是一个有界数据流则依然是一个批处理)
            //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//使用DataStream的时候根据数据源自动选择是批还是流
            //todo 2)接入数据源,读取文件获取数据
            //DataStreamSource<String> lines = env.readTextFile("./data/input/wordcount.txt");
            DataStreamSource<String> lines = env.socketTextStream("node01", 9999);

            //todo 3)数据处理
            //  3.1:使用flatMap对单词进行拆分
            SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String line, Collector<String> out) throws Exception {
                    String[] words = line.split(" ");
                    //返回数据
                    for (String word : words) {
                        out.collect(word);
                    }
                }
            });

            //  3.2:对拆分后的单词进行记一次数
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    return Tuple2.of(word, 1);
                }
            });

            //  3.3:使用分组算子对key进行分组
            //wordAndOne.keyBy(0);
//        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//            @Override
//            public String getKey(Tuple2<String, Integer> value) throws Exception {
//                return value.f0;
//            }
//        });
            KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

            //  3.4:对分组后的key进行聚合操作
            SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);

            //todo 4)构建sink,输出结果
            sumed.print();

            //todo 5)启动运行
            env.execute();
        }
}

提交部署

1上传作业jar包

2指定递交参数

img

3查看任务运行概述

img

4查看任务运行结果

img

标签:String,day01,flink,基础,api,import,apache,org
来源: https://www.cnblogs.com/deepJL/p/15906481.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有