ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink Sink:接收器

2022-03-20 13:36:24  阅读:173  来源: 互联网

标签:接收器 Flink String flink Sink org apache import sink


flink代码分为三部分:

1、Source----数据源,读取数据

2、Transformation----转换,对数据进行处理,也就是算子

3、Sink----将数据发出去

Flink 将转换计算后的数据发送的地点 。
Flink 常见的 Sink 大概有如下几类:

1、写入文件
2、打印出来
3、写入 socket
4、自定义的 sink 。

自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

1、写入文件

这个东西不要硬记,要学会看官网

package com.shujia.flink.sink

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Demo1FileSink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //读取数据
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    /**
      * 写入文件
      */
      
    val sink: StreamingFileSink[String] = StreamingFileSink
      //指定保存路径和数据的编码格式
      .forRowFormat(new Path("data/flink/out"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          //滚动生成文件的策略
          .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
          //至少包含15分钟的数据
          .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
          //最近5分钟没有收到新的记录
          .withMaxPartSize(1024 * 1024 * 1024)//文件大小达到1G(写入最后一条记录之后)
          .build())
      .build()

    linesDS.addSink(sink)

    env.execute()
  }
}

//执行结果会生成一个out目录,该目录下会生成结果的文件

4、自定义的 sink

自定义sink就是实现SinkFunction

package com.shujia.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Demo2SinkFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    //使用自定义的sink
    studentDS.addSink(new MysqlSink)

    env.execute()
  }
}

/**
  * 自定义sink
  * SinkFunction : 普通sink
  * RichSinkFunction: 多了 open 和 close 方法
  */

class MysqlSink extends RichSinkFunction[String] {
  /**
    * open 在invoke之前执行,每一个task中只执行一次
    * 所以使用RichSinkFunction,
    * RichSinkFunction 比 SinkFunction多了open()、close()方法
    * 可以将加载驱动、创建链接放入open()内
    */ 
  var con: Connection = _

  override def open(parameters: Configuration): Unit = {
    println("open")
    //1、加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    //创建链接
    //?useUnicode=true&characterEncoding=utf-8 -- 写数据了所以要指定编码格式
    con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=utf-8", "root", "123456")
  }

   /**
    * close 在invoke之后执行,每一个task中只 _?_ 执行
    */
  override def close(): Unit = {
    println("close")
    con.close()
  }

  /**
    * invoke:每一条数据执行一次
    * @param stu     : 一条数据
    * @param context : 上下文对象
    */
    
  override def invoke(stu: String, context: SinkFunction.Context[_]): Unit = {

    //切分数据
    val split: Array[String] = stu.split(",")
    
    //创建PreparedStatement
    val stat: PreparedStatement = con.prepareStatement("insert into student (id,name,age,gender,clazz) values(?,?,?,?,?)")

    //设置参数
    stat.setString(1, split(0))
    stat.setString(2, split(1))
    stat.setInt(3, split(2).toInt)
    stat.setString(4, split(3))
    stat.setString(5, split(4))

    //执行插入
    stat.execute()

  }
}

自定义 sink ,统计单词的数量,并把结果写入MySQL中

replace 如果主键不存在就插入,如果主键存在就替换,和 insert 用法相同

package com.shujia.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object Demo3WcSInkMysql {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val countDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)

    countDS.addSink(new RichSinkFunction[(String, Int)] {

      var con: Connection = _

      override def open(parameters: Configuration): Unit = {
        //1、加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        //创建链接
        con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=utf-8", "root", "123456")
      }

      override def close(): Unit = con.close()

      override def invoke(kv: (String, Int), context: SinkFunction.Context[_]): Unit = {
        //replace 如果主键不存在就插入,如果主键存在就替换
        val stat: PreparedStatement = con.prepareStatement("replace into word_count(word,count) values(?,?)")

        stat.setString(1, kv._1)
        stat.setInt(2, kv._2)

        stat.execute()

      }
    })

    env.execute()
  }
}

标签:接收器,Flink,String,flink,Sink,org,apache,import,sink
来源: https://www.cnblogs.com/saowei/p/16029685.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有