ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

FLINK基础(117): DS数据类型

2021-08-26 22:32:08  阅读:188  来源: 互联网

标签:FLINK Java Tuple Flink 数据类型 Tuple2 117 public


0 简介

Flink程序所处理的流中的事件一般是对象类型。操作符接收对象输出对象。所以Flink的内部机制需要能够处理事件的类型。在网络中传输数据,或者将数据写入到状态后端、检查点和保存点中,都需要我们对数据进行序列化和反序列化。为了高效的进行此类操作,Flink需要流中事件类型的详细信息。Flink使用了Type Information的概念来表达数据类型,这样就能针对不同的数据类型产生特定的序列化器,反序列化器和比较操作符。

Flink也能够通过分析输入数据和输出数据来自动获取数据的类型信息以及序列化器和反序列化器。尽管如此,在一些特定的情况下,例如匿名函数或者使用泛型的情况下,我们需要明确的提供数据的类型信息,来提高我们程序的性能。

在这一节中,我们将讨论Flink支持的类型,以及如何为数据类型创建相应的类型信息,还有就是在Flink无法推断函数返回类型的情况下,如何帮助Flink的类型系统去做类型推断。

1 支持的数据类型

Flink支持Java和Scala提供的所有普通数据类型。最常用的数据类型可以做以下分类:

  • Primitives(原始数据类型)
  • Java和Scala的Tuples(元组)
  • Scala的样例类
  • POJO类型
  • 一些特殊的类型

接下来让我们一探究竟。

Primitives

Java和Scala提供的所有原始数据类型都支持,例如Int(Java的Integer),String,Double等等。下面举一个例子:

DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L);
numbers.map(n -> n + 1);

Tuples

元组是一种组合数据类型,由固定数量的元素组成。

Flink为Java的Tuple提供了高效的实现。Flink实现的Java Tuple最多可以有25个元素,根据元素数量的不同,Tuple都被实现成了不同的类:Tuple1,Tuple2,一直到Tuple25。Tuple类是强类型。

复制代码
DataStream<Tuple2<String, Integer>> persons = env
  .fromElements(
    Tuple2.of("Adam", 17),
    Tuple2.of("Sarah", 23)
  );

persons.filter(p -> p.f1 > 18);
复制代码

Tuple的元素可以通过它们的public属性访问——f0,f1,f2等等。或者使用getField(int pos)方法来访问,元素下标从0开始:

import org.apache.flink.api.java.tuple.Tuple2

Tuple2<String, Integer> personTuple = Tuple2.of("Alex", 42);
Integer age = personTuple.getField(1); // age = 42

不同于Scala的Tuple,Java的Tuple是可变数据结构,所以Tuple中的元素可以重新进行赋值。重复利用Java的Tuple可以减轻垃圾收集的压力。举个例子:

personTuple.f1 = 42; // set the 2nd field to 42
personTuple.setField(43, 1); // set the 2nd field to 43
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(value -> value.f0);

POJO

POJO类的定义:

  • 公有类
  • 无参数的公有构造器
  • 所有的字段都是公有的,可以通过getters和setters访问。
  • 所有字段的数据类型都必须是Flink支持的数据类型。

举个例子:

复制代码
public class Person {
  public String name;
  public int age;

  public Person() {}

  public Person(String name, int age) {
    this.name = name;
    this.age = age;
  }
}

DataStream<Person> persons = env.fromElements(
  new Person("Alex", 42),
  new Person("Wendy", 23)
);
复制代码

 

public class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy(value -> value.word);

 

其他数据类型

  • Array, ArrayList, HashMap, Enum
  • Hadoop Writable types

2 为数据类型创建类型信息

Flink类型系统的核心类是TypeInformation。它为系统在产生序列化器和比较操作符时,提供了必要的类型信息。例如,如果我们想使用某个key来做联结查询或者分组操作,TypeInformation可以让Flink做更严格的类型检查。

Flink针对Java和Scala分别提供了类来产生类型信息。在Java中,类是

org.apache.flink.api.common.typeinfo.Types

举个例子:

复制代码
TypeInformation<Integer> intType = Types.INT;

TypeInformation<Tuple2<Long, String>> tupleType = Types
  .TUPLE(Types.LONG, Types.STRING);

TypeInformation<Person> personType = Types
  .POJO(Person.class);
复制代码

 

标签:FLINK,Java,Tuple,Flink,数据类型,Tuple2,117,public
来源: https://www.cnblogs.com/qiu-hua/p/15191886.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有