ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink-任务需要申请的资源和并行度有关,和task的数量没有关系

2022-07-24 20:03:36  阅读:192  来源: 互联网

标签:设置 task String val Flink 并行度 setParallelism env


Flink-任务需要申请的资源和并行度有关,和task的数量没有关系

1. 非Flink的需要5个task,如下:

但是在Flink中采用共享模式

在代码中设置并行度为2

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._

object Demo3Parallelism {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    //设置并行度
    env.setParallelism(2)

    linesDS
      .flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute()
  }
}

在UI界面中为:

但是只会占两个槽

当输入数据时

代码中的并行度优先级大于在UI界面中设置的

这里我们设置5个并行度

提交之后,还是不变

2. 这里我们重新修改代码,打包提交,代码如下:

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._

object Demo3Parallelism {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    /**
     * 设置flink任务的并行度
     * 1、在代码中同意设置并行度
     * 2、可以在体检任务的时候设置并行度   代码中设置并行度的优先级高于提交热任务是设置的并行度
     * 2、每一个算子可以单独设置并行度,keyBy除外  -- 优先级时最高的
     *
     *
     * flink 任务需要申请的资源和并行度有关,和task的数量没有关系
     *
     */
    //设置并行度
    //env.setParallelism(2)

    val linesDS: DataStream[String] = env
      .socketTextStream("master", 8888)
      .setParallelism(1)
      .name("读取socket中的数据") //每一个节点可以设置一个名字
      .uid("1") //为一个标识
      .shuffle //将前面拆分成两部分,并行度一样时也会拆分


    val wordsDS: DataStream[String] = linesDS
      .flatMap(_.split(","))
      .setParallelism(2)
      .name("将一行数据转换成多行")
      .uid("2")

    val kvDS: DataStream[(String, Int)] = wordsDS
      .map((_, 1))
      .setParallelism(3)
      .name("转换成kv格式")
      .uid("3")

    val keyByDs: KeyedStream[(String, Int), String] = kvDS
      .keyBy(_._1)


    val countDS: DataStream[(String, Int)] = keyByDs
      .sum(1)
      .setParallelism(4)
      .name("分组聚合")
      .uid("4")

    countDS.print()
      .setParallelism(1)
      .name("打印结果")
      .uid("5")

    env.execute()
  }
}

输入数据,查看结果

标签:设置,task,String,val,Flink,并行度,setParallelism,env
来源: https://www.cnblogs.com/atao-BigData/p/16515315.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有