标签:tmp word env field 本地 table DataTypes 安装 PYFLINK
来源:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/
一 安装环境与安装
您需要一台具有以下功能的计算机:
- Java 8 or 11
- Python 3.6, 3.7 or 3.8
使用Python Table API需要安装PyFlink,它已经被发布到 PyPi,您可以通过如下方式安装PyFlink:
$ python -m pip install apache-flink
安装PyFlink后,您便可以编写Python Table API作业了。
二 编写一个Flink Python Table API程序
编写Flink Python Table API程序的第一步是创建TableEnvironment
。这是Python Table API作业的入口类。
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() t_env = TableEnvironment.create(settings)
接下来,我们将介绍如何创建源表和结果表。
# write all the data to one file t_env.get_config().get_configuration().set_string("parallelism.default", "1") t_env.connect(FileSystem().path('/tmp/input')) \ .with_format(OldCsv() .field('word', DataTypes.STRING())) \ .with_schema(Schema() .field('word', DataTypes.STRING())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field_delimiter('\t') .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .with_schema(Schema() .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .create_temporary_table('mySink')
You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL:
my_source_ddl = """ create table mySource ( word VARCHAR ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/input' ) """ my_sink_ddl = """ create table mySink ( word VARCHAR, `count` BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/output' ) """ t_env.sql_update(my_source_ddl) t_env.sql_update(my_sink_ddl)
上面的程序展示了如何创建及在ExecutionEnvironment
中注册表名分别为mySource
和mySink
的表。 其中,源表mySource
有一列: word,该表代表了从输入文件/tmp/input
中读取的单词; 结果表mySink
有两列: word和count,该表会将计算结果输出到文件/tmp/output
中,字段之间使用\t
作为分隔符。
接下来,我们介绍如何创建一个作业:该作业读取表mySource
中的数据,进行一些变换,然后将结果写入表mySink
。
最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当execute_insert(sink_name)
被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
from pyflink.table.expressions import lit tab = t_env.from_path('mySource') tab.group_by(tab.word) \ .select(tab.word, lit(1).count) \ .execute_insert('mySink').wait()
该教程的完整代码如下:
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.expressions import lit settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() t_env = TableEnvironment.create(settings) # write all the data to one file t_env.get_config().get_configuration().set_string("parallelism.default", "1") t_env.connect(FileSystem().path('/tmp/input')) \ .with_format(OldCsv() .field('word', DataTypes.STRING())) \ .with_schema(Schema() .field('word', DataTypes.STRING())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field_delimiter('\t') .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .with_schema(Schema() .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .create_temporary_table('mySink') tab = t_env.from_path('mySource') tab.group_by(tab.word) \ .select(tab.word, lit(1).count) \ .execute_insert('mySink').wait()
三 执行一个Flink Python Table API程序
首先,你需要在文件 “/tmp/input” 中准备好输入数据。你可以选择通过如下命令准备输入数据:
$ echo -e "flink\npyflink\nflink" > /tmp/input
接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“/tmp/output”已经存在,你需要先删除文件,否则程序将无法正确运行起来):
$ python WordCount.py
上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。
最后,你可以通过如下命令查看你的运行结果:
$ cat /tmp/output flink 2 pyflink 1
标签:tmp,word,env,field,本地,table,DataTypes,安装,PYFLINK 来源: https://www.cnblogs.com/qiu-hua/p/14869332.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。