ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink Sql With 1.14 Queries 查询-概览(译)

2022-02-23 22:03:36  阅读:248  来源: 互联网

标签:1.14 Table SQL Flink product env Sql table SELECT


查询 #

SELECT语句和VALUES语句是sqlQuery()TableEnvironment. 该方法将 SELECT 语句(或 VALUES 语句)的结果作为Table. ATable可用于后续 SQL 和 Table API 查询转换为 DataStream写入 TableSink。SQL 和 Table API 查询可以无缝混合,并进行整体优化并转换为单个程序。

为了在 SQL 查询中访问表,它必须在 TableEnvironment 中注册。可以从TableSourceTableCREATE TABLE 语句DataStream注册表。或者,用户也可以在 TableEnvironment 中注册目录以指定数据源的位置。

为方便起见,Table.toString()自动在其唯一名称下注册表TableEnvironment并返回该名称。因此,Table可以将对象直接内联到 SQL 查询中,如下面的示例所示。

注意:包含不受支持的 SQL 功能的查询会导致TableException. 以下部分列出了批处理表和流表上支持的 SQL 功能。

指定查询 #

以下示例显示如何在已注册表和内联表上指定 SQL 查询。

Java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// SQL query with an inlined (unregistered) table
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
Table result = tableEnv.sqlQuery(
  "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

// SQL query with a registered table
// register the DataStream as view "Orders"
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
// run a SQL query on the Table and retrieve the result as a new Table
Table result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// create and register a TableSink
final Schema schema = Schema.newBuilder()
    .column("product", DataTypes.STRING())
    .column("amount", DataTypes.INT())
    .build();

final TableDescriptor sinkDescriptor = TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .format(FormatDescriptor.forFormat("csv")
        .option("field-delimiter", ",")
        .build())
    .build();

tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor);

// run an INSERT SQL on the Table and emit the result to the TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

Python

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

# SQL query with an inlined (unregistered) table
# elements data type: BIGINT, STRING, BIGINT
table = table_env.from_elements(..., ['user', 'product', 'amount'])
result = table_env \
    .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)

# create and register a TableSink
schema = Schema.new_builder()
    .column("product", DataTypes.STRING())
    .column("amount", DataTypes.INT())
    .build()

sink_descriptor = TableDescriptor.for_connector("filesystem")
    .schema(schema)
    .format(FormatDescriptor.for_format("csv")
        .option("field-delimiter", ",")
        .build())
    .build()

t_env.create_temporary_table("RubberOrders", sink_descriptor)

# run an INSERT SQL on the Table and emit the result to the TableSink
table_env \
    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

回到顶部

执行查询 #

可以通过该TableEnvironment.executeSql()方法执行SELECT语句或VALUES语句将内容收集到本地。该方法将 SELECT 语句(或 VALUES 语句)的结果作为TableResult. 类似于 SELECT 语句,Table可以使用该Table.execute()方法执行对象,将查询的内容收集到本地客户端。 TableResult.collect()方法返回一个可关闭的行迭代器。除非已收集所有结果数据,否则选择作业不会完成。我们应该主动关闭作业,以避免通过该CloseableIterator#close()方法泄漏资源。TableResult.print()我们也可以通过该方法将选择结果打印到客户端控制台。中的结果数据TableResult只能访问一次。因此,collect()不得print()互相调用。

TableResult.collect()并且TableResult.print()在不同的检查点设置下具有略微不同的行为(要为流式作业启用检查点,请参阅检查点配置)。

  • 对于没有检查点的批处理作业或流式作业,TableResult.collect()TableResult.print()没有完全一次也没有至少一次保证。查询结果一旦生成,客户端就可以立即访问,但是当作业失败并重新启动时会抛出异常。
  • 用于具有一次性检查点的流式作业,TableResult.collect()TableResult.print()保证端到端的一次性记录交付。只有在相应的检查点完成后,客户端才能访问结果。
  • 用于具有至少一次检查点的流式作业,TableResult.collect()TableResult.print()保证端到端的至少一次记录交付。查询结果一旦生成,客户端就可以立即访问,但可能会多次传递相同的结果。

Java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");

// execute SELECT statement
TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
// use try-with-resources statement to make sure the iterator will be closed automatically
try (CloseableIterator<Row> it = tableResult1.collect()) {
    while(it.hasNext()) {
        Row row = it.next();
        // handle row
    }
}

// execute Table
TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
tableResult2.print();

Python

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env, settings)
# enable checkpointing
table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")

table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")

# execute SELECT statement
table_result1 = table_env.execute_sql("SELECT * FROM Orders")
table_result1.print()

# execute Table
table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
table_result2.print()


回到顶部

语法 #

Flink 使用支持标准 ANSI SQL 的Apache Calcite解析 SQL。

以下 BNF 语法描述了批处理和流式查询中支持的 SQL 功能的超集。操作部分显示了受支持功能的示例,并指出了哪些功能仅支持批处理或流式查询。

语法↕

Flink SQL 使用类似于 Java 的标识符(表、属性、函数名)的词法策略:

  • 无论是否引用标识符,都会保留标识符的大小写。
  • 之后,标识符会区分大小写。
  • 与 Java 不同,反引号允许标识符包含非字母数字字符(例如)。“SELECT a AS my field FROM t”

字符串文字必须用单引号括起来(例如,SELECT 'Hello World')。复制单引号以进行转义(例如,SELECT 'It''s me')。

Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
|      EXPR$0 |  EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set

字符串文字中支持 Unicode 字符。如果需要显式 unicode 代码点,请使用以下语法:

  • 使用反斜杠 ( \) 作为转义字符(默认):SELECT U&'\263A'
  • 使用自定义转义字符:SELECT U&'#263A' UESCAPE '#'

回到顶部

操作 #

from flink website url:Flink SQL 查询 概览 | Apache Flink

----------------------------------------------------------- 禁止转载 --------------------------------------------------------

标签:1.14,Table,SQL,Flink,product,env,Sql,table,SELECT
来源: https://blog.csdn.net/qq_44326412/article/details/123100127

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有