ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

快速实现KAFKA单机、集群部署

2021-09-19 09:33:28  阅读:143  来源: 互联网

标签:bin 单机 -- 9092 kafka topic 集群 usr KAFKA


一、概述

1、关键作用:解耦、削峰填谷、异步处理

2、常见的MQ消费分类:至多一次消费、没限制

3、kafka名词解释

① topic:一个消息只能进入一个topic中;

② partition:每个topic会根据分区数划分多个分区,每个分区相互独立。消息有key时使用hash分发到对应分区,无key使用轮询分散到各个分区;

③ 分区数:决定每个topic有多少个partition;

④ 副本因子:决定每个partition在kafka集群中有多少个副本,副本在不同的节点上,防止单点故障;

⑤ broker:kafka集群中的机器,每一个机器就是一个broker,每个broker负责若干个分区。关系:同一个分区只能给一个broker使用,但一个broker可以对应多个分区;

⑥ offset:每个分区都是顺序的,分区间序列互不干扰;分区内有序,全局不能确保有序;

⑦ consumer group:由多个consumer组成,同一个组不重复消费;

4、日志分区:日志默认自动保存7天,可以指定存放日志的目录:

log.retention.hours=168(七天)

5、kafka快的原因:kafka重要特点是:顺序写入和零拷贝。

(1)本地磁盘I/O

正常IO

① 用户发送一次系统调用read,用户在结果返回之前一直处于阻塞状态;

② CPU执行IO读取指令,操作系统将数据读取到磁盘控制缓冲区,并给CPU发送中断信号,cpu处理中断,将磁盘缓冲区拷贝到内核缓冲区;

③ IO读取动作需要执行多次,直到已经读到足够的数据,每次操作都会给cpu发送一个中断信号。

④ 最后cpu再将数据返回给用户;

DMA

① 用户发送一次系统调用read,用户在结果返回之前一直处于阻塞状态;

② CPU向DMA发送IO读取指令,操作系统将数据读取到磁盘控制缓冲区,并给中断DMA,DMA将磁盘缓冲区拷贝到内存;

③ IO读取动作需要执行多次,直到已经读到足够的数据,每次操作都会给DMA发送一个中断信号,而不是中断CPU。

④ 最后DMA中断CPU通知读取完成,再将数据返回给用户;

(2)网络I/O:

正常网络IO

需要进行用户空间和内核空间的切换,需要从用户空间切换到内核空间,将数据从磁盘加载内存,并拷贝到用户空间应用程序的内存中,再由用户空间的应用拷贝数据到内核空间的socket缓冲区中,再由网络发送出去。

零拷贝网络IO

应用程序接收到一个socket请求,内核空间先从磁盘将数据加载到内存,然后不需要再拷贝回用户空间的应用程序,直接拷贝到socket缓存区,并由网络发送出去,这个过程数据不需要重新经过用户空间。

二、kafka搭建 && Topic管理

1、安装java环境
查看jdk是否安装

rpm -qa | grep jdk

安装jdk

rpm -ivh jdk-8u191-linux-x64.rpm

增加了java目录

ll /usr/java

卸载jdk

rpm -e `rpm -qa | grep jdk`

查看java版本

java -version

sum公司提供的命令,查看java进程

jps

配置java环境变量

vi /etc/profile

编辑:

JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH

保存后,加载配置:

source /etc/profile

测试环境变量配置情况:

echo $JAVA_HOME

2、配置主机名与IP的绑定关系

vi /etc/hosts
192.168.93.129 CentOSA

3、关闭防火墙

service iptables status
service iptables start
service iptables stop
chkconfig iptables off
chkconfig --list | grep iptables

查看防火墙状态

systemctl status firewalld.service

关闭运行的防火墙

systemctl stop firewalld.service 

永久关闭防火墙

systemctl disable firewalld.service

zk集群搭建是,部分节点启动失败并报错java.net.NoRouteToHostException: No route to host (Host unreachable),原因就是其中某些机器没有关闭防火墙成功。

4、安装zk

tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
cd /usr/zookeeper-3.4.6
cp conf/zoo_sample.cfg conf/zoo.cfg
vi conf/zoo.cfg

修改数据目录为 /root/zkdata
启动:

cd /usr/zookeeper-3.4.6/
/usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
停止:
/usr/zookeeper-3.4.6/bin/zkServer.sh stop
状态:
/usr/zookeeper-3.4.6/bin/zkServer.sh status

通过jps查看java进程,显示:

QuorumPeerMain
Jps

查看zk的安装情况:

/usr/zookeeper-3.4.6/bin/zkServer.sh status /usr/zookeeper-3.4.6/conf/zoo.cfg //安装成功后显示Mode:standalone

5、安装kafka:
解压

tar -zxf kafka_2.11-2.2.0.tgz -C /usr/
cd /usr/kafka_2.11-2.2.0

vi /usr/kafka_2.11-2.2.0config/server.properties

broker.id=0
listeners=PLAINTEXT://CentOSA:9092
log.dir=/usr/kafka_logs
zookeeper.connect=CentOSA:2181

kafka启动:

/usr/kafka_2.11-2.2.0/bin/kafka-server-start.sh -daemon /usr/kafka_2.11-2.2.0/config/server.properties

通过jps查看kafka进程

创建topic

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server 主机名:9092 -create --topic topic名 --partitions 分区数量 --replication-factor 分区因子数值
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092 -create --topic topic01 --partitions 3 --replication-factor 1

查看新建的topic:

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092 --list

消费:阻塞住

/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --topic topic名 --group group1
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092 --topic topic01 --group group1

生产者:

/usr/kafka_2.11-2.2.0/bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic topic名
eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-producer.sh --broker-list CentOSA:9092 --topic topic01

测试:生产者端发送内容,消费者可以接收到对应的内容。

三、kafka集群搭建

1、前置准备
每个节点都配置好主机域名:

vi /etc/hosts
192.168.233.199 CentOSA
192.168.233.12 CentOSB
192.168.233.13 CentOSC

测试:互相能通过主机名ping通

可以用到scp命令:

scp .bashrc 接收主机名:~/
eg:
scp apache-flume-1.9.0-bin.tar.gz CentOSC:/var/local
scp jdk-8u191-linux-x64.rpm CentOSC:/var/local
scp kafka_2.11-2.2.0.tgz CentOSC:/var/local
scp zookeeper-3.4.6.tar.gz CentOSC:/var/local
scp kafka-eagle-bin-1.4.0.tar.gz CentOSC:/var/local

按照上面单个节点部署好环境
xshell的工具里面有个“发送键输入到所有会话”的功能很方便快捷。

2、同步时钟:

yum install -y ntp
ntpdate ntp1.aliyun.com

同步时钟:

clock -w

3、配置zk集群
先安装zk,后修改配置文件:
修改data目录为:/root/zkdata
新增zk集群:

server.1=CentOSA:2888:3888
server.2=CentOSB:2888:3888
server.3=CentOSC:2888:3888

复制配置到其他节点:

scp -r /usr/zookeeper-3.4.6 CentOSB:/usr/
scp -r /usr/zookeeper-3.4.6 CentOSC:/usr/

在每个节点建立数据目录

给每个节点创建zk的id号:
A节点:

echo 1 > /root/zkdata/myid

B节点:

echo 2 > /root/zkdata/myid

C节点:

echo 3 > /root/zkdata/myid

每个节点都启动zk:

/usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
/usr/zookeeper-3.4.6/bin/zkServer.sh restart /usr/zookeeper-3.4.6/conf/zoo.cfg

4、安装kafka集群
先安装单机安装,后配置zk集群:

zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181

拷贝

scp -r kafka_xxx CentOSB:/usr/
scp -r kafka_xxx CentOSC:/usr/

但需要分别修改对应的配置文件:server.properties

CentOSB:
broker.id=1
listeners=PLAINTEXT://CentOSB:9092

CentOSC:
broker.id=2
listeners=PLAINTEXT://CentOSC:9092

5、使用kafka
集群创建topic

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 -create --topic topic02 --partitions 3 --replication-factor 3

查看集群topic列表:

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --list

查看topic详情

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --describe --topic topic01

修改分区:只增不减

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --alter --topic topic01 --partition 3 

删除topic

/usr/kafka_2.11-2.2.0/bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --delete --topic topic01

消费时三个节点都需要写上:

/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic名 --group 分组名

eg:
/usr/kafka_2.11-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --group group3

四、kafka基础API

五、kafka监控kafkaEagle

解压:

tar -zxf kafka-eagle-bin-1.4.0.tar.gz
cd kafka-eagle-bin-1.4.0
tar -zxf kafka-eagle-web-1.4.0-bin.tar.gz -C /usr/
cd /usr/
mv kafka-eagle-web-1.4.0 kafka-eagle

配置环境变量

KE_HOME=/usr/kafka-eagle并追加到$PATH:
PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
exports $KE_HOME

测试

echo $KE_HOME

配置config文件:

vi /usr/kafka-eagle/conf/system-config.properties

目前只有一个集群,故将默认配置中的集群别名保留一个:

kafka.eagle.zk.cluster.alias-cluster1

配置zk集群列表:

cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181

是否启动监控图表,默认是不启动的

kafka.eagle.metrics.charts=true

如果设置为true,则kafka需要修改启动文件kafka-server-start.sh,找到对应位置,并增加export JMX_PORT=“7788”:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="7788"
fi

修改数据库连接,注释sqlite配置,采用mysql链接,并配置mysql链接参数(需要让数据库允许远程访问):

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.233.199:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=

执行权限

chmod u+x /usr/kafka-eagle/bin/ke.sh

启动

/usr/kafka-eagle/bin/ke.sh start

会自动创建数据库并提供对应的web访问网址

/usr/kafka-eagle/bin/ke.sh stop

六、flume集成

tar -zxf apache-flume-1.9.0-bin.tar.gz -C /usr/
cd /usr/apache-flume-1.9.0-bin/

配置config

vi conf/kafka.properties

启动flume

./bin/flume-ng agent -c conf/ -n a1  -f conf/kafka.properties -D 启动参数

标签:bin,单机,--,9092,kafka,topic,集群,usr,KAFKA
来源: https://blog.csdn.net/Pencil1312/article/details/120377223

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有