ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

FlinkSQL实践记录2

2022-01-23 12:01:38  阅读:585  来源: 互联网

标签:count cnt name 记录 FlinkSQL 实践 sink mysql table


1. 背景

昨天《FlinkSQL实践记录1》对FlinkSql做了简单的使用insert into .. select ..,今天对聚合运算做一些实践。

2. 代码实践

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        // 插入数据
        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                       // "order by name "
        );
        System.out.println(tableResult.getJobClient().get().getJobStatus());

2.1 mysql表不加primary主键

# 注意需要使用bigint, int类型会报错
create table count_info (
name varchar(100),
cnt bigint ) ;

当上游数据不断产生时,会将实时产生的新结果插入mysql

2.2 mysql表添加primary主键

create table count_info (
name varchar(100),
cnt bigint,
primary key(NAME)
) ;

当上游数据不断产生时,会将实时产生的新结果更新至mysql

新生产一批数据后

3. 遇到的问题及解决办法

3.1 sink table缺失主键

Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

解决办法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

3.2 不能排序

Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

解决办法:去掉order by

        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                      //  "order by name "
        );

4. 不过瘾

接下来对join关联做些实践

标签:count,cnt,name,记录,FlinkSQL,实践,sink,mysql,table
来源: https://www.cnblogs.com/route/p/15836029.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有