ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink学习

2022-03-21 19:58:38  阅读:163  来源: 互联网

标签:String flink 学习 source usr local 防盗链


实时即未来

1. Flink介绍

1.1. 发展历史

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0W6AIRw9-1647863509976)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317201723092.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YRcE2NNq-1647863509978)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317201811115.png)]

1.2. 官方介绍

Apache Flink - 数据流上的有状态计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jXZh15k3-1647863509979)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317201932826.png)]

1.3. 组件栈

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tKdhrmyL-1647863509981)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317202149651.png)]

1.4. 应用场景

所有的流式处理

2. Flink的安装部署

2.1. local 本地模式

2.1.1. 原理

  1. Flink程序由JobClient进行提交
  2. JobClient将作业提交给JobManager
  3. JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
  4. TaskManager启动一一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
  5. 作业执行完成后,结果将发送回客户端(obClient)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bZ7xomA6-1647863509989)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317202931510.png)]

2.1.2. 操作

  1. 下载安装包
    https://archive.apache.org/dist/flink/

  2. 上传文件flink-1.12.0-bin-scala_2.12.tgz到指定位置

  3. 解压

    tar -zxvf flink-1.12.0-bin-scala_ _2.12.tgz
    
  4. 如果出现权限问题,需要修改权限

    chown -R root:root /export/server/flink-1.12.0
    
  5. 改名或创建软链接

    mv flink-1.12.0 flink
    In -s /export/server/flink-1.12.0 /export/server/flink
    

2.1.3. 测试

  1. 准备文件/usr/local/soft/flink/data/words.txt

    vim /usr/local/soft/flink/data/words.txt
    
    hello me you her
    hello me you
    hello me
    hello
    

    数据随便放一点进去,只是为了一个测试

  2. 启动flink本地“集群”

    /usr/local/soft/flink/flink/bin/start-cluster.sh
    
  3. 使用jps查看以下两个进程

    -TaskManagerRunner
    
    -StandaloneSessionClusterEntrypoint
    
  4. 访问Flink的Web UI
    Ip:8081/#/overview

  5. 执行官方示例

    /usr/local/soft/flink/flink/bin/flink run /usr/local/soft/flink/flink/examples/batch/WordCount.jar --input /usr/local/soft/flink/data/words.txt --output /usr/local/soft/flink/data/out
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uYc1LTPl-1647863509992)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317212440606.png)]

  6. 停止flink
    /usr/local/soft/flink/flink/bin/stop-cluster.sh
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GSBsCwN8-1647863509994)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317212701681.png)]

​ 启动shell交互窗口(目前所有的scala 2.12版本的安装包暂时都不支持Scala shell)

		/usr/local/soft/flink/flink/bin/start-scala-shell.sh local

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Mb1r2Mal-1647863509997)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317213220899.png)]

​ 执行如下命令


​ 退出shell

:quit

2.2. Standalone独立集群模式

2.2.1. 原理

  1. client客户端提交任务给JobManager
  2. JobManager负责申请任务运行所需要的资源并管理任务和资源
  3. JobManager分发任务给TaskManager执行
  4. TaskManager定期向JobManager汇报状态

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QSJF8YqJ-1647863509998)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317213341593.png)]

2.2.2. 操作

1.集群规划: .

  • 服务器:master(Master + Slave): JobManager + TaskManager
    服务器:node1(Slave): TaskManager+ TaskManager
  • 服务器:node2(Slave): TaskManager
  1. 修改配置文件

​ 2.1修改 flink-conf.yaml

vim /usr/local/soft/flink/flink/conf/flink-conf.yaml

jobmanager.rpc.address: master
taskmanager.numberofTaskSlots: 2
web.submit.enable: true

#历史服务器
jobmanager.archive.fs.dir: hdfs://master:8082/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://master:8082/flink/completed-jobs/

​ 2.2 修改masters

vim /usr/local/soft/flink/flink/conf/masters

master:8081

​ 2.3修改slaves

master
node1
node2

​ 2.4添加HADOOP_CONF_DIR环境变量

vim /etc/profile
export HADOOP_CONF_DIR=/usr/local/soft/hadoop/hadoop-2.7.7/etc/hadoop

​ 2.5分发

scp -r /usr/local/soft/flink/flink node1:/usr/local/soft/flink/

scp -r /usr/local/soft/flink/flink node2:/usr/local/soft/flink/

2.2.3.测试

  1. 启动集群,在master上面直接执行下面的命令
/usr/local/soft/flink/flink/bin/start-cluster.sh
或者单独启动
  1. 启动历史服务器
/usr/local/soft/flink/flink/bin/historyserver.sh start
  1. 测试案例

    /usr/local/soft/flink/flink/bin/flink run /usr/local/soft/flink/flink/examples/batch/WordCount.jar
    
  2. 停止集群

/usr/local/soft/flink/flink/bin/stop-cluster.sh

2.3. Standalone–HA高可用集群模

2.3.1 原理

从之前的架构中我们可以很明显的发现JobManager有明显的单点问题(SPOF, single point of failure)。
JobManager肩负着任务调度以及资源分配,一旦JobManager出现意外,其后果可想而知。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ku5O1ipG-1647863510002)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220318002639644.png)]

2.3.2 操作

1.集群规划: .

  • 服务器:master(Master + Slave): JobManager + TaskManager
    服务器:node1(Slave): TaskManager+ TaskManager
  • 服务器:node2(Slave): TaskManager
  1. 启动ZooKeeper

    zkServer.sh status
    zkServer.sh stop
    zkServer.sh start
    
  2. 启动HDFS

    hadoop/sbin/start-dfs.sh
    
  3. 停止Flink集群

    flink/bin/stop-cluster.sh
    
  4. 修改flink-conf.yaml

    vim /usr/local/soft/flink/flink/conf/flink-conf.yaml
    

    增加如下配置

    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs://master:8081/flink-checkpoints
    high-availability: zookeeper
    high-availability.storageDir: hdfs://master:8020/flink/ha/
    high-availability.zookeeper.quorum: master:2181,node1:2181,node2:2181
    
  5. 在conf文件中修改master

master:8081
node1:8081
  1. 同步,分发
scp -r conf/ node2:/usr/local/soft/flink/flink

scp -r conf/ node1:/usr/local/soft/flink/flink

2.3.3. 测试

2.4. Flink-On-Yarn

2.4.1. 原理

  1. 原理
    为什么使用flink on yarn ?
    在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:

    -1.Yarn的资源可以按需使用,提高集群的资源利用率

    -2.Yarn的任务有优先级,根据优先级运行作业

    -3.基于Yarn调度系统,能够自动化地处理各个角色的Failover(容错)
    O JobManager进程和TaskManager进程都由Yarn NodeManager监控
    O如果JobManager进程异常退出,则Yarn ResourceManager会重新调度JobManager到其他机器
    O如果TaskManager进程异常退出,JobManager 会收到消息并重新向Yarn ResourceManager申请资源,重新启
    动TaskManager

  2. Flink如何和Yarn进行交互

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-58ZqsazZ-1647863510003)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320102650362.png)]

  1. Flink ON Yarn模式

3.1 session模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1yasnvyc-1647863510004)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320102959250.png)]

3.2 Per-Job模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-M6pHC8GU-1647863510005)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320103326813.png)]

2.4.2. 操作

1、配置HADOOP_CONF_DIR

vim /etc/profile
	export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/


2、将hadoop依赖jar上传到flink lib目录

flink-shaded-hadoop-2-uber-2.6.5-10.0

flink和spark一样都是粗粒度资源申请

flink启动方式

1、yarn-session   在yarn里面启动一个flink集群 jobManager(ApplicationMaster)
  yarn-session是所有任务共享同一个jobmanager
	先启动hadoop 
	yarn-session.sh -jm 1024m -tm 1096m
提交任务  任务提交的是偶根据并行度动态申请taskmanager
	1、在web页面提交任务

	2、同flink命令提交任务
	flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 

	3、rpc方式提交任务

关闭yarn-session
 yarn application -kill application_1647657435495_0001

2、直接提交任务到yarn 每一个任务都会有一个

jobManager
	 flink run -m yarn-cluster  -yjm 1024m -ytm 1096m -c com.shujia.flink.core.Demo1WordCount flink-1.0.jar


杀掉yarn上的任务
yarn application -kill application_1599820991153_0005

查看日志
yarn logs -applicationId application_1647657435495_0002
yarn-session先在yarn中启动一个jobMansager ,所有的任务共享一个jobmanager (提交任务更快,任务之间共享jobmanager , 相互有影响)
直接提交任务模型,为每一个任务启动一个joibmanager (每一个任务独立jobmanager , 任务运行稳定)

3. Flink入门介绍

3.1. 前置说明

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AW9rs1Aq-1647863510007)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320104118153.png)]

注意:入门案例使用DataSet后续就不再使用,而是统一使用流批一体的DataStream

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GTPmmeYS-1647863510009)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320124018188.png)]

3.1.1编程模型

Flink应用程序结构主要包含三部分,Source/Transformation/Sink,如图三部分

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CFQFS65T-1647863510015)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320124233273.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pEz917YC-1647863510018)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320124402897.png)]

3.2. 准备环境

  1. 创建maven项目

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mwGhdoU1-1647863510019)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320131126488.png)]

  2. pom依赖文件

   <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>

    </dependencies>


    <build>

        <plugins>


            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


        </plugins>
    </build>
</project>

3.3.代码实现小案例,WordCount

  1. 示例代码

    不会写就先照葫芦画瓢,这里是官网的代码示例

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WKlFxIFD-1647863510022)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320131349179.png)]

  2. 自己实现

    package blockhouse
    
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala._
    
    object DemoWordCount {
      def main(args: Array[String]): Unit = {
        /**
         * 1.创建flink的运行环境
         * flink程序的入口
         */
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //设置并行度
        env.setParallelism(2)
        /**
         * 2.读取数据
         * SataStream,相当于spark中的DStream
         */
        //这里使用socket来读取数据
        val dataDS: DataStream[String] = env.socketTextStream("master", 9999)
    
        //打印一下结果
        //dataDS.print()
    
        //把数据展开
        val wordsDS: DataStream[String] = dataDS.flatMap(line => line.split(","))
    
        //装成kv格式
        val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
    
        //按照单词进行分组,和groupBy比较类似,kv => kv._1制定一个字段进行分组
        //指定完成后,这里变成了KeyedStream流
        val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
    
        //统计数量,对value进行求和,这里的value就是1,指定下标进行聚合。
        val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
    
        //打印最终的结果
        countDS.print()
    
        /**
         * 启动flink程序
         */
        //这里的jobname是在启动flink的时候指定的,但是spark中是在启动环境的时候启动的
        env.execute("workcount")
      }
    }
    
    
  3. spark和flink的本质区别

    主要是在suffer端不太一样

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iGAxQqwq-1647863510025)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320143540274.png)]

数据从上游的task发送到下游task的时候有缓冲

  • 时间超过200毫秒
  • 数据量达到32kb

并不是一条数据发送一次,这样的话性能会很差。上面两个条件满足一个,就会把数据发送到下游

3.4. Source:数据源

Flink在流处理和批处理上的source大概有4类

  1. 基于本地集合的source–>有界流

  2. 基于文件的source–>有界流

  3. 基于网络套接字的source–>无界流

  4. 自定义的source。自定义的source常见的有Apache kafka、Amazon Kinesis Streams、RabbitMQ、 Twitter Streaming API、Apache Nifi 等,当然你也可以定义自己的source。–>无界流

3.4.1. Source数据源的使用

  1. 这个是通过fromCollection(List"“))这个方法来创建一个有界流的数据这个
 listDS.flatMap(_.split(","))
      .map((_,1))
      .keyBy(_._1)
      .sum(1)
      .print()
求WordCount
package blockhouse.source


import org.apache.flink.streaming.api.scala._

object DemoListSource {
  def main(args: Array[String]): Unit = {
    /**
     * 根据集合来创建source
     * 首先创建spark环境
     */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
     * 基于本地集合构建DS --- 有界流
     */
    val listDS: DataStream[String] = env.fromCollection(List("java,java","spark","hadoop","javba","hadoop,java,java,hadoop"))

    listDS.flatMap(_.split(","))
      .map((_,1))
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute("WordCount")

  }
}

  1. 这个是通过readTextFile方法来读取本地文件的有界流数据

    下面的处理仍然是求WordCount

    package blockhouse.source
    
    
    import org.apache.flink.streaming.api.scala._
    
    object DemoFIleDemo {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val fileDS: DataStream[String] = env.readTextFile("data/students.txt")
    
        fileDS.map(stu => {
          val clazz: String = stu.split(",")(4)
          (clazz,1)
        })
          .keyBy(_._1)
          .sum(1)
          .print()
    
        env.execute("DemoFIleDemo")
      }
    }
    
    
  2. 自定义的Source,这里需要继承SourceFunction方法,重写run方法和cancel方法

package blockhouse.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object DemoSourceFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    val myDS: DataStream[Int] = env.addSource(new MySource)

    myDS.print()

    env.execute()
  }

}

class MySource extends SourceFunction[Int]{
  override def run(ctx: SourceFunction.SourceContext[Int]): Unit ={

    /**
     * run: 用于生成数据的方法,只执行一次,想要执行加个死循环即可
     * ctx: 用于将数据发送到下游的对象
     *
     */
    var i = 0
    while(i < 100) {
      i += 1
      ctx.collect(i)
    }
  }

  //任务被取消的时候执行,一般用于回收资源
  override def cancel(): Unit = {

  }
}

  1. 自定义source的好处很明显,我们可以自定义的读取任何来源的数据,这里简单的介绍一下读取mysql的数据。
package blockhouse.source

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object DemoMysqlSource {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val result: DataStream[(Int, String, String, String)] = env.addSource(new MysqlSource)
    result.print()

    env.execute("MysqlSource")
  }

}

/**
 * SourceFunction: 基础的SourceFunction, 单线程的source
 * RichSourceFunction: 比SourceFunction多了open和close方法 -单线程的source
 * ParallelSourceFunction : 多并行的source
 * RichParallelSourceFunction 比SourceFunction多了open和close方法  -多并行的source
 */

class MysqlSource extends RichSourceFunction[(Int,String,String,String)]{

  var connDS: Connection = _
  override def open(parameters: Configuration): Unit = {
    //使用jdbc读取mysql
    Class.forName("com.mysql.jdbc.Driver")

    //建立连接
   connDS = DriverManager.getConnection("jdbc:mysql//192.168.3.67:3306/kayleigh", "root", "123456")

  }

  override def close(): Unit ={
    connDS.close()

  }

  override def run(ctx: SourceFunction.SourceContext[(Int,String,String,String)]): Unit = {

    //查询数据
    val sta1: PreparedStatement = connDS.prepareStatement("select * from runoob_tbl")

    //执行查询
    val resultSet: ResultSet = sta1.executeQuery()
    while(resultSet.next()){
      val id: Int = resultSet.getInt("runoob_id")
      val title: String = resultSet.getString("runoob_title")
      val author: String = resultSet.getString("runoob_author")
      val date: String = resultSet.getString("submission_date")

      //将数据发送到下游
      ctx.collect((id,title,author,date))
    }


  }

  override def cancel(): Unit = {

  }
}

4. DataStream常用算子

上面讲的是Source数据源层,这里来介绍TransFormation,数据转换的各种操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Zl4gMbgy-1647863510027)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320214809664.png)]

在Flink应用程序中,无论你的应用程序是批程序,还是流程序,都是上图这种模型,有数据源(source),有数据下游(sink) ,我们写的应用程序多是对数据源过来的数据做一系列操作, 总结如下。
● Source:数据源,Flink在流处理和批处理.上的source大概有4类:基于本地集合的source、 基于文件的
source、基于网络套接字的source、自定义的source。自定义的source常见的有Apache kafka、Amazon
Kinesis Streams、RabbitMQ、 Twitter Streaming API、Apache NiFi等,当然你也可以义自己的
source。

●Transformation:数据转换的各种操作,有Map / FlatMap/ Filter 1 KeyBy / Reduce / Fold / Aggregations /Window / WindowAll / Union / Window join / Split / Select/ Project等,操作很多,可以将数据转换计算成
你想要的数据。

●Sink:接收器,Sink是指Flink将转换计算后的数据发送的地点,你可能需要存储下来。Flink常见的Sink大概有如下几类:写入文件、打印出来、写入Socket、自定义的Sink。 自定义的sink常见的有Apache kafka、RabbitMQ、 MySQL、ElasticSearch、 Apache Cassandra、Hadoop FileSystem 等,同理你也可以
定义自己的Sink.

4.1. Map

Map 算子的输入流是 DataStream,经过 Map 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成一个元素,举个例子:

SingleOutputStreamOperator<Employee> map = employeeStream.map(new MapFunction<Employee, Employee>() {
    @Override
    public Employee map(Employee employee) throws Exception {
        employee.salary = employee.salary + 5000;
        return employee;
    }
});
map.print();

新的一年给每个员工的工资加 5000。

4.2. FlatMap

FlatMap 算子的输入流是 DataStream,经过 FlatMap 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成零个、一个或多个元素,举个例子:

SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new FlatMapFunction<Employee, Employee>() {
    @Override
    public void flatMap(Employee employee, Collector<Employee> out) throws Exception {
        if (employee.salary >= 40000) {
            out.collect(employee);
        }
    }
});
flatMap.print();

将工资大于 40000 的找出来。

4.3. Filter

SingleOutputStreamOperator  filter = ds.filter(new FilterFunction<Employee>() {
    @Override
    public boolean filter(Employee employee) throws Exception {
        if (employee.salary >= 40000) {
            return true;
        }
        return false;
    }
});
filter.print();

对每个元素都进行判断,返回为 true 的元素,如果为 false 则丢弃数据,上面找出工资大于 40000 的员工其实也可以用 Filter 来做:

4.4. KeyBy

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E8DeEuhd-1647863510030)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320222244115.png)]

KeyBy 在逻辑上是基于 key 对流进行分区,相同的 Key 会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。举个例子:

KeyedStream<ProductEvent, Integer> keyBy = productStream.keyBy(new KeySelector<ProductEvent, Integer>() {
    @Override
    public Integer getKey(ProductEvent product) throws Exception {
        return product.shopId;
    }
});
keyBy.print();

根据商品的店铺 id 来进行分区。

4.5. Reduce

Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average、sum、min、max、count,使用 Reduce 方法都可实现。

SingleOutputStreamOperator<Employee> reduce = employeeStream.keyBy(new KeySelector<Employee, Integer>() {
    @Override
    public Integer getKey(Employee employee) throws Exception {
        return employee.shopId;
    }
}).reduce(new ReduceFunction<Employee>() {
    @Override
    public Employee reduce(Employee employee1, Employee employee2) throws Exception {
        employee1.salary = (employee1.salary + employee2.salary) / 2;
        return employee1;
    }
});
reduce.print();

上面先将数据流进行 keyby 操作,因为执行 Reduce 操作只能是 KeyedStream,然后将员工的工资做了一个求平均值的操作。

4.6. Aggregations

DataStream API 支持各种聚合,例如 min、max、sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

KeyedStream.sum(0) 
KeyedStream.sum("key") 
KeyedStream.min(0) 
KeyedStream.min("key") 
KeyedStream.max(0) 
KeyedStream.max("key") 
KeyedStream.minBy(0) 
KeyedStream.minBy("key") 
KeyedStream.maxBy(0) 
KeyedStream.maxBy("key")


max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。

标签:String,flink,学习,source,usr,local,防盗链
来源: https://blog.csdn.net/Kayleigh520/article/details/123644302

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有