ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink Sink到File(文件)

2021-01-14 19:33:42  阅读:846  来源: 互联网

标签:val tableEnv Flink 模式 Sink File new Table id


知识点

表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。 

具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入 注册过的 TableSink 中。同时表的输出跟更新模式有关

更新模式(Update Mode)
    对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。
    Flink Table API 中的更新模式有以下三种:
    
    1)追加模式(Append Mode) 
        在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 
    2)撤回模式(Retract Mode) 
        在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。  
        插入(Insert)会被编码为添加消息;  
        删除(Delete)则编码为撤回消息;  
        更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 
        在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 
    3)Upsert(更新插入)模式 
        在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 
        这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息外部连接器需要知道这个唯一 key 的属性。 
        插入(Insert)和更新(Update)都被编码为 Upsert 消息;  
        删除(Delete)编码为 Delete 信息。 
        这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。

 

1、代码案例

package guigu.table.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

/**
 * @program: demo
 * @description: ${description}
 * @author: yang
 * @create: 2021-01-14 18:48
 */
object FileSink {
  def main(args: Array[String]): Unit = {

    //1、环境准备
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取数据,创建表视图
    val inputFile = "E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"
    tableEnv.connect(new FileSystem().path(inputFile))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("temperature",DataTypes.DOUBLE())
        .field("timestamp",DataTypes.BIGINT())
      )
      .createTemporaryTable("inputTable")

    //3、table api转换
    val tableApi: Table = tableEnv.from("inputTable")
    val apiResult: Table = tableApi.select("id,temperature").where("id = 'sensor_1'")
    val sqlResult: Table = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_1'")
    //字符串模板
    val sqlModelResult: Table = tableEnv.sqlQuery(
      """
        |select id,temperature
        |from inputTable
        |where id = 'sensor_1'
      """.stripMargin)

    //4、创建输出表视图
    val outputFile = "E:\\java\\demo\\src\\main\\resources\\file\\outputFile.csv"
    tableEnv.connect(new FileSystem().path(outputFile))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("temperature",DataTypes.DOUBLE())
        )
      .createTemporaryTable("outputTable")

    //5、执行
    sqlModelResult.insertInto("outputTable")

    tableEnv.execute("Flink Sink Flie Test")

  }
}

 

标签:val,tableEnv,Flink,模式,Sink,File,new,Table,id
来源: https://www.cnblogs.com/ywjfx/p/14278937.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有