ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink APIs(数据来源,数据流向)

2022-07-23 23:33:06  阅读:156  来源: 互联网

标签:Flink String val APIs flink apache env 数据流 import


Flink APIs

2. Flink版的WordCount

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._
object Demo1WordCount {
  def main(args: Array[String]): Unit = {
    /**
     * 1.创建flink的环境
     *
     */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //此种只能作用于有界流,当作用在无界流时就会报错
    //env.setRuntimeMode(RuntimeExecutionMode.BATCH)

    //设置flink任务的并行度
    //默认和电脑的核数有关
    env.setParallelism(2)

    //数据从上游发送到下游的超时时间
    //默认是200毫秒
    env.setBufferTimeout(200)

    /**
     * 2.读取数据
     *
     */
    val lines: DataStream[String] = env.socketTextStream("master", 8888)

    /**
     * 3.统计单词的数量
     */
    //将一行转换为多行
    val words: DataStream[String] = lines.flatMap(lines => lines.split(","))
    //转化为kv格式
    val kvDS: DataStream[(String, Int)] = words.map(words => (words, 1))
    //按照单词分组
    val groupByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
    //对value进行汇总
    val countDS: DataStream[(String, Int)] = groupByDS.sum(1)

    /**
     * 4.查看数据
     */

    countDS.print()
    /**
     * 5.启动flink  的程序
     *
     */
    env.execute()
  }
}

3. Transformation

4. Source:数据源

  1. ListSource

    package com.wt.flink.scurce
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala._
    
    object Demo1ListSource {
      def main(args: Array[String]): Unit = {
    
        //创建flink的环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        /**
         * flink: 执行模式
         *
         * RuntimeExecutionMode.BATCH:批处理模式,只能用于有界流,计算输出最终的结果
         *
         * RuntimeExecutionMode.STREAMING  : 流处理模式,可以用于有界流也可以用于五界流,输出连续的结果
         *
         */
         env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    
        /**
         * 集合本地集合source   -  有界流
         *
         * 当读取的数据源是一个有界流时,flink处理完数据就结束了
         *
         */
        val lineDS: DataStream[String] = env.fromCollection(List("java,spark", "java,hadoop", "java"))
    
        lineDS
          .flatMap(_.split(","))
          .map((_,1))
          .keyBy(_._1)
          .sum(1)
          .print()
    
        env.execute()
      }
    }
    
  2. FileSource

    package com.wt.flink.scurce
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala._
    
    object Demo2FileSource {
      def main(args: Array[String]): Unit = {
        //创建flink的环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //批处理模式
        env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    
        /**
         * 基于集合构建source -- 有界流
         *
         */
        val studentsDS: DataStream[String] = env.readTextFile("data/students.txt")
    
        val clazzDS: DataStream[(String, Int)] = studentsDS.map(stu => (stu.split(",")(4), 1))
    
        val clazzNumDS: DataStream[(String, Int)] = clazzDS.keyBy(kv => kv._1).sum(1)
    
        clazzNumDS.print()
        env.execute()
      }
    }
    
  3. SocketSource

    package com.wt.flink.scurce
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala._
    
    object Demo3SocketSource {
      def main(args: Array[String]): Unit = {
        //创建flink的环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //定义无界流
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    
        /**
         *     基于socket构建source --无界流
         *
         *     无界流只能使用流处理模式,不能使用批处理模式
         */
        val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    
        linesDS
          .flatMap(_.split(","))  //按照,分割数据
          .map((_,1))                    //将每个数据尾部拼成(k,1)变成KV格式,
          .keyBy(_._1)                   //按照K来进行分组
          .sum(1)              //对v进行求和
          .print()
    
        env.execute()
      }
    }
    
  4. MySqlSource

    package com.wt.flink.scurce
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala._
    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
    
    object Demo4MySqlSource {
      def main(args: Array[String]): Unit = {
        //创建flink的环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //使用自定义的source读取mysql中的数据
        val mysqlDS: DataStream[String] = env.addSource(new MySqlSource())
    
        mysqlDS
          .map(stu => (stu.split("\t")(4), 1))
          .keyBy(_._1)
          .sum(1)
          .print()
    
        env.execute()
    
      }
        /**
         * 自定义source ,实现SourceFunction接口
         *
         */
    
        class MySqlSource extends SourceFunction[String] {
          /**
           * run: 用于读取外部数据的方法,只执行一次
           *
           * @param ctx : 上下文对象,用于将读取到的数据发送到下游
           */
          override def run(ctx: SourceFunction.SourceContext[String]):Unit = {
            /**
             *  使用jdbc读取mysql的数据,将读取到的数据发送到下游
             *
             */
            //创建连接
            Class.forName("com.mysql.jdbc.Driver")
            val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17", "root", "123456")
    
            //编写查询的数据sql
            val stat: PreparedStatement = conn.prepareStatement("select * from students")
    
            //执行查询
            val result: ResultSet = stat.executeQuery()
    
            //解析数据
            while(result.next()){
              val id: Long = result.getLong("id")
              val name: String = result.getString("name")
              val age: Long = result.getLong("age")
              val gender: String = result.getString("gender")
              val clazz: String = result.getString("clazz")
    
              //将每一条数据发送到下游
              ctx.collect(s"$id\t$name\t$age\t$gender\t$clazz")
            }
    
            //关闭连接
            stat.close()
            conn.close()
          }
          //任务被取消的时候执行,一般用于回收资源
          override def cancel(): Unit = {}
      }
    }
    

5. Sink数据发送到哪里

  1. FileSink

    package com.wt.flink.sink
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.configuration.MemorySize
    import org.apache.flink.connector.file.sink.FileSink
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
    import org.apache.flink.streaming.api.scala._
    
    object Demo1FileSink {
      def main(args: Array[String]): Unit = {
        //创建flink环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    
        //读取数据
        val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
    
        //统计i班级的人数
        val kvDS: DataStream[(String, Int)] = studentDS.map(stu => (stu.split(",")(4), 1))
    
        val countDS: DataStream[(String, Int)] = kvDS.keyBy(_._1).sum(1)
    
        //将统计好的结果保存到文件中
        //老版本
        //countDS.writeAsText("data/flink/clazz_num")
    
        //新版本的api
        val sink: FileSink[(String, Int)] = FileSink
          .forRowFormat(new Path("data/flink/clazz_num"), new SimpleStringEncoder[(String, Int)]("UTF-8"))
          .withRollingPolicy(
            DefaultRollingPolicy.builder()
              //至少包含多少时间的数据
              //.withRolloverInterval(Duration.ofSeconds(10))
              //多少时间没有新的数据
              //.withInactivityInterval(Duration.ofSeconds(10))
              //数据达到多大
              .withMaxPartSize(MemorySize.ofMebiBytes(1))
              .build())
          .build()
    
        //使用file sink
        countDS.sinkTo(sink)
        env.execute()
      }
    }
    
  2. Print

    package com.wt.flink.sink
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    
    object Demo2Print {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
    
        lineDS.print()
    
        env.execute()
      }
    }
    
  3. SinkFunction 自定义函数

    package com.wt.flink.sink
    import org.apache.flink.streaming.api.functions.sink.SinkFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    
    object Demo3SinkFunction {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
        
        //使用自定义的Sink
        linesDS.addSink(new SinkFunction[String]{
          /**
           * invoke 每一条数据都会执行一次
           *
           * @param value   : 一条数据
           * @param context : 上下文对象
           */
    
          override def invoke(value: String, context: SinkFunction.Context): Unit = {
            println(value)
          }
        })
        env.execute()
      }
    }
    
  4. MySqlSink

    package com.wt.flink.sink
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    object Demo4MysqlSink {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
    
        //统计单词的数量
        val countDS: DataStream[(String, Int)] = lineDS
          .flatMap(_.split(","))
          .map((_, 1))
          .keyBy(_._1)
          .sum(1)
    
        //将统计号的结果保存到MySlq中
        countDS.addSink(new MySlqSink)
        
        env.execute()
    
      }
      /**
       * 自定斯诺克将数据保存到mysql中
       * SinkFunction:
       * RichSinkFunction: 多个open和close方法
       *
       */
      class MySlqSink extends RichSinkFunction[(String,Int)]{
    
        var con: Connection = _
        var stat: PreparedStatement = _
    
        /**
         * open:在invoke之前执行,每个task中只执行一次
         * 一般用于初始化数据库的连接
         *
         */
        override def open(parameters:Configuration):Unit ={
          //1、加载驱动
          Class.forName("com.mysql.jdbc.Driver")
          //创建链接
          con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17", "root", "123456")
          //编写插入数据的sql
          //replace :如果不存在插入,如果存在就替换,需要在表中设置主键
          stat = con.prepareStatement("replace into word_num(word,num) values(?,?)")
        }
    
        /**
         * 任务关闭时后执行,一般用于回收资源
         */
        override def close():Unit={
          //关闭连接
          stat.close()
          con.close()
        }
    
        /**
         * 每一条数据会执行一次
         * 使用jdbc将数据保存到mysql中
         *
         * @param kv      : 一行数据
         * @param context : 上下文对象
         */
        override def invoke(kv: (String, Int), context: SinkFunction.Context): Unit = {
          //设置参数
          stat.setString(1, kv._1)
          stat.setInt(2, kv._2)
          //执行插入
          stat.execute()
        }
      }
    }
    

标签:Flink,String,val,APIs,flink,apache,env,数据流,import
来源: https://www.cnblogs.com/atao-BigData/p/16513567.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有