ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink-优化(MiniBatch_Local-Global,反压)

2022-08-01 22:34:24  阅读:222  来源: 互联网

标签:count flink word -- Global batch 反压 set table


8、优化

1、MiniBatch 聚合

flink默认是每一条数据都会取更新状态

MiniBatch :缓存一批数据一起更新状态,优点:增加吞吐量,缺点:增加延迟-

  • 开启MiniBatch

    -- sql中开启
    -- 开启
    set table.exec.mini-batch.enabled=true; 
    -- 最大缓存时间
    set table.exec.mini-batch.allow-latency='5 s'; 
    -- 批次大小
    set table.exec.mini-batch.size=1000;
    

2、Local-Global 聚合

开启预聚合需要先开启MiniBatch

set table.exec.mini-batch.enabled=true; 
-- 最大缓存时间
set table.exec.mini-batch.allow-latency='5 s'; 
-- 批次大小
set table.exec.mini-batch.size=1000;
-- 开启预聚合
set table.optimizer.agg-phase-strategy=TWO_PHASE;
  • 示例

    -- 删除表
    drop table words;
    -- source 表
    CREATE TABLE words (
        word STRING
    ) WITH (
    'connector' = 'datagen',
     'rows-per-second' = '1000000', -- 每秒生成的数据行数据
     'fields.word.length' = '2' --字段长度限制
    );
    
    

    上游生产数据的速度时50万每秒,下游消费数据的速度10万每秒 ---- 反压

    -- 删除表
    drop table blackhole_table;
    -- 黑洞
    CREATE TABLE blackhole_table (
    	word STRING,
        c BIGINT
    )
    WITH ('connector' = 'blackhole')
    -- 执行查询
    insert into blackhole_table
    select word,count(1) as c from 
    words
    group by word
    

    开启minibatch和预聚合

    预聚合之后上游发生到数据下游数据量会减少,可以解决反压

    flink内部已经欸有发生反压了

    set table.exec.mini-batch.enabled=true; 
    set table.exec.mini-batch.allow-latency='5 s'; 
    set table.exec.mini-batch.size=1000;
    set table.optimizer.agg-phase-strategy=TWO_PHASE;
    
    • 将数据保存到mysql,写入数据的速度只能达到1600/s - 反压
    --mysql sink
    CREATE TABLE word_count (
    	word STRING,
        c BIGINT,
        PRIMARY KEY (word) NOT ENFORCED 
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://master:3306/bigdata',
       'table-name' = 'word_count',
       'username' = 'root',
       'password' = '123456'
    )
    
    insert into word_count
    select word,count(1) as c from 
    words
    group by word
    
    • 数据写入mysql,增加批次大小,和提高并行度,可以解决反压
    drop table word_count;
    CREATE TABLE word_count (
    	word STRING,
        c BIGINT,
        PRIMARY KEY (word) NOT ENFORCED 
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://master:3306/bigdata',
       'table-name' = 'word_count',
       'username' = 'root',
       'password' = '123456',
       'sink.buffer-flush.max-rows'='1000' ,-- 每批次最大值,会增加延迟
       'sink.parallelism' ='3' --提高写入数据并行度,增加成本
    );
    
    insert into word_count
    select word,count(1) as c from 
    words
    group by word;
    
    • flink将写入hbase
    -- hbase sink
    drop table hbase_word_count;
    CREATE TABLE hbase_word_count (
     word STRING,
     info ROW<c BIGINT>,
     PRIMARY KEY (word) NOT ENFORCED
    ) WITH (
     'connector' = 'hbase-1.4',
     'table-name' = 'word_count',
     'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181',
     'sink.parallelism' = '3', -- 写入数据并行度
      'sink.buffer-flush.max-rows' = '3000'  -- 写入数据批次大小
    );
    
    --先再habse中创建表
    create  'word_count','info'
    
    --- 将数据写入hbase
    insert into hbase_word_count
    select word,ROW(c) as info from (
        select word,count(1) as c from 
        words
        group by word
    ) as a
    

9、反压

上游生产数据速度比下游消费数据速度要大,flink就会发生反压,反压会从下游向上游传播,直到sourcetask会降低拉取数据速度,避免flink任务执行报错

  • flink内部反压

    • 增加flink任务的并行度

      增加并行度相当于就是增加资源,成本会增加

      -- flink sql
      SET 'parallelism.default' = '2';
      
    • 开启MiniBatch和预聚合

      开启之后会增加延迟

      set table.exec.mini-batch.enabled=true; 
      -- 最大缓存时间
      set table.exec.mini-batch.allow-latency='5 s'; 
      -- 批次大小
      set table.exec.mini-batch.size=1000;
      -- 开启预聚合
      set table.optimizer.agg-phase-strategy=TWO_PHASE;
      
  • 将数据保持到外部系统

    • mysql

      -- 每批次最大值,会增加延迟
      'sink.buffer-flush.max-rows'='1000' 
      --提高写入数据并行度,增加成本
      'sink.parallelism' ='3' 
      
  • Hbase

    • flink查询hbase可以开启异步IO,lookup.async=true,只支持hbase2.2以上版本
    -- 每批次最大值,会增加延迟
    'sink.buffer-flush.max-rows'='1000' 
    --提高写入数据并行度,增加成本
    'sink.parallelism' ='3' 
    

标签:count,flink,word,--,Global,batch,反压,set,table
来源: https://www.cnblogs.com/atao-BigData/p/16542089.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有