ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【大数据面试】Flink 02 基本操作:入门案例、Env、Source、Transform、数据类型、UDF、Sink

2022-02-07 22:35:44  阅读:245  来源: 互联网

标签:02 DataStream String val dataStream 数据类型 flatMap env 基本操作


二、基本操作

1、入门案例

(1)批处理wordcount--DataSet

val env = ExecutionEnvironment.getExecutionEnvironment

 // 从文件中读取数据

 val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt"

 val inputDS: DataSet[String] = env.readTextFile(inputPath)

 // 分词之后,对单词进行groupby分组,然后用sum进行聚合

 val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)

 // 打印输出

 wordCountDS.print()

(2)流处理wordcount--DataStream

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    // 从外部命令中获取参数

    val params: ParameterTool =  ParameterTool.fromArgs(args)

    val host: String = params.get("host")

    val port: Int = params.getInt("port")

    // 创建流处理环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 接收socket文本流

    val textDstream: DataStream[String] = env.socketTextStream(host, port)

    // flatMap和Map需要引用的隐式转换

    import org.apache.flink.api.scala._

    val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)

    dataStream.print().setParallelism(1)

    // 启动executor,执行任务

    env.execute("Socket stream word count")

  }

}

2、Environment创建

getExecutionEnvironment,客户端,当前执行程序的上下文

createLocalEnvironment:返回本地执行环境

createRemoteEnvironment:集群执行环境,需要指定ip、端口及jar包

3、Source读取

(1)从集合读取数据:env.fromCollection(List(SensorReading("sensor_1 ",15477181 99,35.8),SensorReading("sensor_6",1547718201,15.4))

(2)从文件读取数据:valstream2=env.readTextFile("YOUR_FILE_PATH")

(3)以kafka消息队列的数据作为来源:valstream3=env.addSource(newFlinkKafkaConsumer 011[String]("sensor",newSimpleStringSchema(),properties))

(4)自定义Source:valstream4=env.addSource(newMySensorSource()

4、Transform算子

Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。

flatMap:flatMap(List(1,2,3))(i⇒List(i,i))变成112233自动加逗号

Filter:过滤掉指定条件的数据。

KeyBy:按照指定的key进行分组,流拆分成不相交的分区。

Reduce:合并当前的元素和上次聚合的结果,用来进行结果汇总合并。

Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)

滚动聚合算子(RollingAggregation):sum()、min()、max()、minBy()、maxBy()针对每个支流聚合Split和Select:拆分和获取指定的流

Connect(放在同一个流中)和CoMap(组合成一个流)

Union:产生一个包含所有DataStream元素的新DataStream

Connect只能操作两个流,Union可以操作多个。

5、常见的数据类型

env.fromElements(XXXX)

(1)基础数据类型

(2)Java和Scala元组(Tuples)

(3)Scala样例类(caseclasses)

(4)Java简单对象(POJOs)

(5)其它(Arrays,Lists,Maps,Enums,等等)

6、UDF-更细粒度的控制流

函数类(Function Classes):实现MapFunction,FilterFunction,ProcessFunction接口

匿名函数(Lambda Functions)

富函数(Rich Functions):函数类的接口,所有Flink函数类都有其Rich版本,自带一系列生命周期方法(开关、得到上下文),可以实现复杂功能

7、sink操作

(1)使用

没有spark中的forEach方法,需要通过stream.addSink(newMySink(xxxx))完成任务最终输出

(2)举例

kafka:union.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new Simple StringSchema()))

redis:dataStream.addSink(newRedisSink[SensorReading](conf,newMyRedisMapper))

Elasticsearch:dataStream.addSink( esSinkBuilder.build() )

自定义sink:dataStream.addSink(newMyJdbcSink())

标签:02,DataStream,String,val,dataStream,数据类型,flatMap,env,基本操作
来源: https://www.cnblogs.com/liujinhui/p/15869593.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有