ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink SQL Client综合实战,项目实践

2021-12-26 12:33:02  阅读:264  来源: 互联网

标签:category pv Flink ts connector Client behavior SQL id


CREATE TABLE user_behavior (

user_id BIGINT,

item_id BIGINT,

category_id BIGINT,

behavior STRING,

ts TIMESTAMP(3),

proctime as PROCTIME(), – 处理时间列

WATERMARK FOR ts as ts - INTERVAL ‘5’ SECOND – 在ts上定义watermark,ts成为事件时间列

) WITH (

‘connector.type’ = ‘kafka’, – kafka connector

‘connector.version’ = ‘universal’, – universal 支持 0.11 以上的版本

‘connector.topic’ = ‘user_behavior’, – kafka topic

‘connector.startup-mode’ = ‘earliest-offset’, – 从起始 offset 开始读取

‘connector.properties.zookeeper.connect’ = ‘192.168.50.43:2181’, – zk 地址

‘connector.properties.bootstrap.servers’ = ‘192.168.50.43:9092’, – broker 地址

‘format.type’ = ‘json’ – 数据源格式为 json

);

  1. 执行SELECT * FROM user_behavior;看看原始数据,如果消息正常应该和下图类似:

6.

窗口统计

  1. 下面的SQL是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START返回的数据格式是timestamp,这里再调用DATE_FORMAT函数将其格式化成了字符串:

SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL ‘10’ MINUTE), ‘yyyy-MM-dd hh:mm:ss’),

DATE_FORMAT(TUMBLE_END(ts, INTERVAL ‘10’ MINUTE), ‘yyyy-MM-dd hh:mm:ss’),

COUNT(*)

FROM user_behavior

WHERE behavior = ‘pv’

GROUP BY TUMBLE(ts, INTERVAL ‘10’ MINUTE);

  1. 得到数据如下所示:

在这里插入图片描述

数据写入ElasticSearch

  1. 确保elasticsearch已部署好;

  2. 执行以下语句即可创建es表,请按照您自己的es信息调整下面的参数:

CREATE TABLE pv_per_minute (

start_time STRING,

end_time STRING,

pv_cnt BIGINT

) WITH (

‘connector.type’ = ‘elasticsearch’, – 类型

‘connector.version’ = ‘6’, – elasticsearch版本

‘connector.hosts’ = ‘http://192.168.133.173:9200’, – elasticsearch地址

‘connector.index’ = ‘pv_per_minute’, – 索引名,相当于数据库表名

‘connector.document-type’ = ‘user_behavior’, – type,相当于数据库库名

‘connector.bulk-flush.max-actions’ = ‘1’, – 每条数据都刷新

‘format.type’ = ‘json’, – 输出数据格式json

‘update-mode’ = ‘append’

);

  1. 执行以下语句,就会将每分钟的pv总数写入es的pv_per_minute索引:

INSERT INTO pv_per_minute

SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL ‘1’ MINUTE), ‘yyyy-MM-dd hh:mm:ss’) AS start_time,

DATE_FORMAT(TUMBLE_END(ts, INTERVAL ‘1’ MINUTE), ‘yyyy-MM-dd hh:mm:ss’) AS end_time,

COUNT(*) AS pv_cnt

FROM user_behavior

WHERE behavior = ‘pv’

GROUP BY TUMBLE(ts, INTERVAL ‘1’ MINUTE);

  1. 用es-head查看,发现数据已成功写入:

在这里插入图片描述

联表操作

  1. 当前user_behavior表的category_id表示商品类目,例如11120表示计算机书籍,61626表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;

  2. 如果我们将这五千多种类目分成6个大类,例如11120属于教育类,61626属于服装类,那么应该有个大类和类目的关系表;

  3. 这个大类和类目的关系表在MySQL创建,表名叫category_info,建表语句如下:

CREATE TABLE category_info(

id int(11) unsigned NOT NULL AUTO_INCREMENT,

parent_id bigint ,

category_id bigint ,

PRIMARY KEY ( id )

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

  1. 表category_info所有数据来自对原始数据中category_id字段的提取,并且随机将它们划分为6个大类,该表的数据请在我的GitHub下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql

  2. 请在MySQL上建表category_info,并将上述数据全部写进去;

  3. 在Flink SQL Client执行以下语句创建这个维表,mysql信息请按您自己配置调整:

CREATE TABLE category_info (

parent_id BIGINT, – 商品大类

category_id BIGINT – 商品详细类目

) WITH (

‘connector.type’ = ‘jdbc’,

‘connector.url’ = ‘jdbc:mysql://192.168.50.43:3306/flinkdemo’,

‘connector.table’ = ‘category_info’,

‘connector.driver’ = ‘com.mysql.jdbc.Driver’,

‘connector.username’ = ‘root’,

‘connector.password’ = ‘123456’,

‘connector.lookup.cache.max-rows’ = ‘5000’,

‘connector.lookup.cache.ttl’ = ‘10min’

);

  1. 尝试联表查询:

SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id

FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C

ON U.category_id = C.category_id;

  1. 如下图,联表查询成功,每条记录都能对应大类:

在这里插入图片描述

  1. 再试试联表统计,每个大类的总浏览量:

SELECT C.parent_id, COUNT(*) AS pv_count

FROM user_behavior AS U LEFT JOIN category

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

_info FOR SYSTEM_TIME AS OF U.proctime AS C

ON U.category_id = C.category_id

WHERE behavior = ‘pv’

GROUP BY C.parent_id;

  1. 如下图,数据是动态更新的:

在这里插入图片描述

  1. 执行以下语句,可以在统计时将大类ID转成中文名:

SELECT CASE C.parent_id

WHEN 1 THEN ‘服饰鞋包’

WHEN 2 THEN ‘家装家饰’

WHEN 3 THEN ‘家电’

WHEN 4 THEN ‘美妆’

WHEN 5 THEN ‘母婴’

WHEN 6 THEN ‘3C数码’

ELSE ‘其他’

END AS category_name,

COUNT(*) AS pv_count

标签:category,pv,Flink,ts,connector,Client,behavior,SQL,id
来源: https://blog.csdn.net/m0_64384302/article/details/122153785

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有