标签:connector 读取 cdn mysql kafka access statistic table
关于kafka的source部分请参考 上一篇: https://www.cnblogs.com/liufei1983/p/15801848.html
1: 首先下载两个和jdbc和mysql相关的jar包,注意版本,我的flink是1.13.1, 所以flink-connect-jdck_2.11也用1.13.1的版本,否则会报错误。
2: 在MYSQL里建立一个表:
-- `sql-demo`.cdn_access_statistic definition (这个在MYSQL里执行) CREATE TABLE `cdn_access_statistic` ( `province` varchar(100) DEFAULT NULL, `access_count` bigint DEFAULT NULL, `total_download` bigint DEFAULT NULL, `download_speed` bigint DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
在zeppelin里创建SINK job: 因为zeppeline是在docker运行,所以MYSQL的url的地址不能写localhost, 要写宿主机的IP
%flink.ssql DROP table if exists cdn_access_statistic; -- Please create this mysql table first in your mysql instance. Flink won't create mysql table for you. CREATE TABLE cdn_access_statistic ( province VARCHAR, access_count BIGINT, total_download BIGINT, download_speed DOUBLE ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://192.168.3.XXX:3306/sql-demo', 'connector.table' = 'cdn_access_statistic', 'connector.username' = 'sql-demo', 'connector.password' = 'demo-sql', 'connector.write.flush.interval' = '1s' )
3: 确定 kafak的source table和 mysql的sink table都创建了。
4: 从kafka消费数据,存储到mysql. 可以看到mysql 数据库里数据在变化
%flink.ssql insert into cdn_access_statistic select client_ip, request_time,request_time,request_time from cdn_access_log
标签:connector,读取,cdn,mysql,kafka,access,statistic,table 来源: https://www.cnblogs.com/liufei1983/p/15802576.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。