标签:Handling 142 name Changelog flink time apache import event
The following code shows how to use toChangelogStream
for different scenarios.
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.data.StringData; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import static org.apache.flink.table.api.Expressions.*; // create Table with event-time tableEnv.executeSql( "CREATE TABLE GeneratedTable " + "(" + " name STRING," + " score INT," + " event_time TIMESTAMP_LTZ(3)," + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + ")" + "WITH ('connector'='datagen')"); Table table = tableEnv.from("GeneratedTable"); // === EXAMPLE 1 === // convert to DataStream in the simplest and most general way possible (no event-time) Table simpleTable = tableEnv .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12)) .as("name", "score") .groupBy($("name")) .select($("name"), $("score").sum()); tableEnv .toChangelogStream(simpleTable) .executeAndCollect() .forEachRemaining(System.out::println); // prints: // +I[Bob, 12] // +I[Alice, 12] // -U[Alice, 12] // +U[Alice, 14] // === EXAMPLE 2 === // convert to DataStream in the simplest and most general way possible (with event-time) DataStream<Row> dataStream = tableEnv.toChangelogStream(table); // since `event_time` is a single time attribute in the schema, it is set as the // stream record's timestamp by default; however, at the same time, it remains part of the Row dataStream.process( new ProcessFunction<Row, Void>() { @Override public void processElement(Row row, Context ctx, Collector<Void> out) { // prints: [name, score, event_time] System.out.println(row.getFieldNames(true)); // timestamp exists twice assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli(); } }); env.execute(); // === EXAMPLE 3 === // convert to DataStream but write out the time attribute as a metadata column which means // it is not part of the physical schema anymore DataStream<Row> dataStream = tableEnv.toChangelogStream( table, Schema.newBuilder() .column("name", "STRING") .column("score", "INT") .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .build()); // the stream record's timestamp is defined by the metadata; it is not part of the Row dataStream.process( new ProcessFunction<Row, Void>() { @Override public void processElement(Row row, Context ctx, Collector<Void> out) { // prints: [name, score] System.out.println(row.getFieldNames(true)); // timestamp exists once System.out.println(ctx.timestamp()); } }); env.execute(); // === EXAMPLE 4 === // for advanced users, it is also possible to use more internal data structures for efficiency // note that this is only mentioned here for completeness because using internal data structures // adds complexity and additional type handling // however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient, // also structured types can be represented as `Row` if needed DataStream<Row> dataStream = tableEnv.toChangelogStream( table, Schema.newBuilder() .column( "name", DataTypes.STRING().bridgedTo(StringData.class)) .column( "score", DataTypes.INT()) .column( "event_time", DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) .build()); // leads to a stream of Row(name: StringData, score: Integer, event_time: Long)
For more information about which conversions are supported for data types in Example 4, see the Table API’s Data Types page.
The behavior of toChangelogStream(Table).executeAndCollect()
is equal to calling Table.execute().collect()
. However, toChangelogStream(Table)
might be more useful for tests because it allows to access the produced watermarks in a subsequent ProcessFunction
in DataStream API.
标签:Handling,142,name,Changelog,flink,time,apache,import,event 来源: https://www.cnblogs.com/qiu-hua/p/15204234.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。