ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

shell脚本并发数据到kafka topic

2021-01-30 16:33:17  阅读:275  来源: 互联网

标签:shell 10000 root kafka topic node1 data sent


shell脚本并发数据到kafka topic

需求:

每秒发送大量数据到kafka,验证下游系统性能,数据中的时间戳要求为当前时间,可以之间采集系统当前时间替换文件中旧的时间戳,保证每次发送的数据都为最新时间。

利用kafka自带的脚本,将待发数据写入文件中,然后通过读取文件 方式,将数据批量发送到kafka task节点

#在kafka master 节点用户home 目录下创建data 目录
[root@node1 home]# mkdir data
#进入data目录
[root@node1 home]# cd data/
#将准备好的数据放入data目录中
[root@node1 data]# ls
batch1-1000.log  batch2-1000.log  batch-send.sh
#编写batch-send.sh 脚本
[root@node1 data]# vi batch-send.sh

shell content:

#!/bin/bash

#响应Ctrl+C中断
trap 'onCtrlC' INT
function onCtrlC () {
    echo 'Ctrl+C is captured'
    exit 1
}

#kafka及data所在目录
dataPath=/home/data
kafkaPath=/usr/local/kafka
#broker list
brokerlist=192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
#kafka的topic,这里设置发送到2个kafka topic
topic1=test-topic1
topic2=test-topic2
#消息总数,可根据需要配置
totalNum=600
#循环计时参数
batchNum=10

#开始时间
start=$(date +%s)

for ((i=1; i<=${totalNum}; i ++))
do
{
    #取余数
    modVal=$(( ${i} % ${batchNum} ))

    #如果循环计数达到设置参数,就发送一批次消息
    if [ ${modVal} = 0 ] ; then
     #在控制台显示进度,*num,取决于数据文件中的数据量     
      echo “$(( ${i}*100 )) of $(( ${totalNum}*100 )) sent”
     # 获取每次循环发送的当前时间
     current=`date "+%Y-%m-%d %H:%M:%S"`
     # 将时间转为UTC 时间,精确到秒
     timeStamp=`date -d "$current" +%s`
     #将current转换为时间戳,精确到毫秒,10#`date "+%N"是为了避免循环过程中出现error:value too great for base
     currentTimeStamp=$((timeStamp*1000+10#`date "+%N"`/1000000))
     #每次循环发送数据前,都将数据文件中的recvTs 参数改为当前时间,放在前台执行,由于是顺序执行,需要等待文件修改完后,才会下一步执行发送文件数据给kafka,每个文件中1000条数据
      sed -i  's/\("recvTs":\)[0-9]*\(,\)/\1'$currentTimeStamp',/g' ${dataPath}/batch1-1000.log ${dataPath}/batch2-1000.log

      #批量发送消息,并且将控制台返回的提示符重定向到/dev/null,&放到后台并发执行
      cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
      cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
    fi
}
#每个for循环等待0.1秒,batchNum=10,即每秒执行一次数据发送,
sleep 0.1
done
#脚本结束时间,并统计脚本执行时间,打印到控制台
end=$(date +%s)
echo $(date)
echo $(( $end - $start ))

脚本执行效果

[root@node1 data]# ./batch-send.sh 
Sat Jan 30 16:15:53 CST 2021
“1000 of 10000 sent”
“2000 of 10000 sent”
“3000 of 10000 sent”
“4000 of 10000 sent”
“5000 of 10000 sent”
“6000 of 10000 sent”
“7000 of 10000 sent”
“8000 of 10000 sent”
“9000 of 10000 sent”
“10000 of 10000 sent”
Sat Jan 30 16:16:04 CST 2021
11
[root@node1 data]# 

这可以通过修改文件中的数据量和循环计时参数,实现每秒发送10000 甚至100000 条数据到kafka。

标签:shell,10000,root,kafka,topic,node1,data,sent
来源: https://www.cnblogs.com/orange2016/p/14349450.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有