ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink-format_小练习

2022-07-31 13:01:31  阅读:153  来源: 互联网

标签:city 练习 STRING format -- flink json code


2、format

1、json

json格式表结构按照字段名和类型进行映射

  • 增加依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.15.0</version>
</dependency>
  • 读取json格式的数据
-- source 表 
CREATE TABLE student_file_json (
    id STRINg,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
)  WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'data/students.json',  -- 必选:指定路径
  'format' = 'json' ,                    -- 必选:文件系统连接器指定 format
  'json.ignore-parse-errors' = 'true'
)
-- sink 表
CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE student_file_json (EXCLUDING ALL)

--执行sql
insert into print_table
select * from student_file_json

  • 将数据保存为json格式
-- source 表 
CREATE TABLE student_file_json (
    id STRINg,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
)  WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'data/students.json',  -- 必选:指定路径
  'format' = 'json' ,                    -- 必选:文件系统连接器指定 format
  'json.ignore-parse-errors' = 'true'
)


-- kafka sink 
CREATE TABLE student_kafka_sink (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',-- 只支持追加的流
  'topic' = 'student_flink_json',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'format' = 'json'
)

-- 执行sql
insert into student_kafka_sink
select * from student_file_json

3、练习

-- 1、使用flink sql 统计每个城市总的车流量
-- 2、source 使用文件source cars_sample.json
-- 3、将统计好的结果保存到mysql中,mysql中只保留最新的结果
  • 代码

{"car":"皖A9A7N2",
"city_code":"340500",
"county_code":"340522",
"card":117988031603010,
"camera_id":"00001",
"orientation":"西南",
"road_id":34052055,
"time":1614711895,
"speed":36.38}

-- 1、创建卡口过车source表
CREATE TABLE cars (
    car STRING,
    city_code STRING,
    county_code STRING,
    card BIGINT,
    camera_id STRING,
    orientation STRING,
    road_id BIGINT,
    `time` STRING,
    speed DOUBLE
)  WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'data/cars_sample.json',  -- 必选:指定路径
  'format' = 'json'                     -- 必选:文件系统连接器指定 format
)

-- 2、创建 mysql sink表

CREATE TABLE city_flow (
  city_code STRING,
  flow BIGINT,
  PRIMARY KEY (city_code) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'city_flow', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
)
-- 3、在数据库中创建表
CREATE TABLE `city_flow` (
  `city_code` varchar(255) NOT NULL,
  `flow` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`city_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 4、统计数据的sql
insert into city_flow
select 
city_code,
count(distinct car) as flow
from 
cars
group by city_code

标签:city,练习,STRING,format,--,flink,json,code
来源: https://www.cnblogs.com/atao-BigData/p/16536898.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有