ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka - 03操作

2022-07-10 14:03:26  阅读:175  来源: 互联网

标签:03 ttopic -- partition Kafka topic 操作 kafka any


Kafka - 03操作 

一、数据读写

1.1 console

[root@my-node51 ~]# kafka-console-producer.sh --bootstrap-server 192.168.6.51:9092 --topic ttopic
>t1------
>t2------
>t3------
>

[root@my-node52 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.6.51:9092 --topic ttopic --from-beginning
t1------
t3------
t2------

1.2 使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。

Kafka Connect是Kafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

[root@my-node51 config]# grep -v "^#" connect-standalone.properties

bootstrap.servers=192.168.6.51:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

### 读取数据源 [root@my-node51 config]# cat connect-file-source.properties name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
### 写入目的地 [root@my-node51 config]# cat connect-file-sink.properties name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
### 开启进程 [root@my-node51 config]# connect-standalone.sh ./connect-standalone.properties ./connect-file-source.properties ./connect-file-sink.properties ### 往源文件写入数据 [root@my-node51 config]# echo test-file4444 >> test.txt [root@my-node51 config]# echo test-file5555 >> test.txt [root@my-node51 config]# echo test-file6666 >> test.txt
### 查看目的文件 [root@my-node51 config]# cat test.sink.txt test-file1111 test-file2222 test-file3333 test-file4444 test-file5555 test-file6666
### 使用console读取数据, 格式为JSON [root@my-node52 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.6.51:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"test-file1111"} {"schema":{"type":"string","optional":false},"payload":"test-file2222"} {"schema":{"type":"string","optional":false},"payload":"test-file3333"} {"schema":{"type":"string","optional":false},"payload":"test-file4444"} {"schema":{"type":"string","optional":false},"payload":"test-file5555"} {"schema":{"type":"string","optional":false},"payload":"test-file6666"}

 

二、topic管理

2.1 修改topic分区数

[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic
Topic: ttopic   PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824

### 执行alter操作 [root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --alter --partitions 5 --topic ttopic [root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic Topic: ttopic PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824 ### 减少topic分区,报错 [root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --alter --partitions 1 --topic ttopic Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 1.

  

2.2 增加副本数

### 增加副本数, 当前topic副本数为1, 增加为2
[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic
Topic: ttopic   PartitionCount: 5       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: ttopic   Partition: 0    Leader: 3       Replicas: 3     Isr: 3
        Topic: ttopic   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: ttopic   Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: ttopic   Partition: 3    Leader: 3       Replicas: 3     Isr: 3
        Topic: ttopic   Partition: 4    Leader: 1       Replicas: 1     Isr: 1

### 编辑变更文件, JSON格式
[root@my-node53 kafka-2.6.0]# cat add-topic-replica.json
{
  "version": 1,
  "partitions":[
     { "topic": "ttopic", "partition": 0, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 1, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 2, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 3, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 4, "replicas": [1,2,3] }
  ]
}

### 执行kafka-reassign-partitions.sh
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092  --reassignment-json-file ./add-topic-replica.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"ttopic","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"ttopic","partition":1,"replicas":[1],"log_dirs":["any"]},
                           {"topic":"ttopic","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"ttopic","partition":3,"replicas":[3],"log_dirs":["any"]},
                           {"topic":"ttopic","partition":4,"replicas":[1],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for ttopic-0,ttopic-1,ttopic-2,ttopic-3,ttopic-4

### 查看变更结果
[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic
Topic: ttopic   PartitionCount: 5       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: ttopic   Partition: 0    Leader: 3       Replicas: 1,2,3 Isr: 3,1,2
        Topic: ttopic   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: ttopic   Partition: 2    Leader: 2       Replicas: 1,2,3 Isr: 2,3,1
        Topic: ttopic   Partition: 3    Leader: 3       Replicas: 1,2,3 Isr: 3,1,2
        Topic: ttopic   Partition: 4    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

 

2.3 迁移topic

### 迁移后的broker 要大于等于 副本数,否则报错
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092  --topics-to-move-json-file ./move-topic-partition.json --broker-list "4" --generate
Error: Replication factor: 3 larger than available brokers: 1.
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

### 创建ttopic3, 2个分区,1个副本
[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --create --topic ttopic3 --partitions 2 --replication-factor 1
Created topic ttopic3.

[root@my-node52 kafka-2.6.0]#  ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic3
Topic: ttopic3  PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: ttopic3  Partition: 0    Leader: 2       Replicas: 2     Isr: 2
        Topic: ttopic3  Partition: 1    Leader: 3       Replicas: 3     Isr: 3

### 编写迁移分区的配置文件
[root@my-node53 kafka-2.6.0]# cat move-topic-partition.json
{"topics": [{"topic": "ttopic3"}], "version": 1}

### 根据topic列表和broker列表,生成迁移计划。
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092  --topics-to-move-json-file ./move-topic-partition.json --broker-list "4" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[3],"log_dirs":["any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[4],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[4],"log_dirs":["any"]}]}

### 迁移计划
[root@my-node53 kafka-2.6.0]# cat expand-cluster-reassignment.json
{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[4],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[4],"log_dirs":["any"]}]}

### 执行迁移计划,迁移topic --execute
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092 --reassignment-json-file ./expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[3],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for ttopic3-0,ttopic3-1

### 检查迁移计划是否执行完成 --verify
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092 --reassignment-json-file ./expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition ttopic3-0 is complete.
Reassignment of partition ttopic3-1 is complete.

Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic ttopic3

### 迁移后 分区信息 [root@my-node52 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic3 Topic: ttopic3 PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: ttopic3 Partition: 0 Leader: 4 Replicas: 4 Isr: 4 Topic: ttopic3 Partition: 1 Leader: 4 Replicas: 4 Isr: 4

 

### 将topic 和 ttopic2 迁移到 1,2,3,4 四个节点上
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092 --topics-to-move-json-file ./move-topic-partition.json --broker-list "1,2,3,4" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"ttopic","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":2,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":4,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic2","partition":0,"replicas":[3,1],"log_dirs":["any","any"]}, {"topic":"ttopic2","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"ttopic2","partition":2,"replicas":[2,3],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"ttopic","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"ttopic2","partition":0,"replicas":[3,2],"log_dirs":["any","any"]}, {"topic":"ttopic2","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"ttopic2","partition":2,"replicas":[1,4],"log_dirs":["any","any"]}]} [root@my-node52 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic Topic: ttopic PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: ttopic Partition: 0 Leader: 1 Replicas: 2,4,1 Isr: 1,2,4 Topic: ttopic Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3 Topic: ttopic Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4 Topic: ttopic Partition: 3 Leader: 1 Replicas: 1,3,4 Isr: 3,1,4 Topic: ttopic Partition: 4 Leader: 1 Replicas: 2,1,3 Isr: 1,2,3
[root@my-node52 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic2 Topic: ttopic2 PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: ttopic2 Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: ttopic2 Partition: 1 Leader: 4 Replicas: 4,3 Isr: 3,4

 

  

 

 

标签:03,ttopic,--,partition,Kafka,topic,操作,kafka,any
来源: https://www.cnblogs.com/kingdomer/p/16462539.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有