ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka:主题管理

2021-02-09 19:02:05  阅读:129  来源: 互联网

标签:管理 -- topics 主题 kafka topic sh Kafka


主题的管理

主题的管理包括创建主题、查看主题信息、修改主题、删除主题等操作,可以通过kafka的kafka-topics.sh脚本来执行这些操作,这个脚本在$KAFKA_HOME/bin/目录下。

该脚本正文仅有一行:

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

实际上调用的是kafka.admin.TopicCommand类来执行主题管理的操作。

创建主题

如果broker端配置了auto.create.topics.enable设置为true。那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认为1)、副本因子为default.replication.factor(默认为1)的主题。此外,当一个消费者从未知主题读取消息时,或当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitionsdefault.replication.factor的值创建一个相应的主题。但是不建议将auto.create.topics.enable设置为true,这回增加主题的管理和维护的难度。

使用kafka-topics.sh脚本一个分区数为4,副本因子为1的主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-create --partitions 4 --replication-factor 1

执行完成后,kafka会在log.dir参数所配置目录下创建对应的主题分区。

ls -al logs | grep topic-create

image-20210209092810943

创建了四个文件夹,对应topic-create主题的四个分区编号,命名方式概括为<topic>-<partition>。这里的文件夹对应的不是分区,分区是一个逻辑概念而没有物理存在。

主题、分区、副本和日志关系如下图:

image-20210209093714473

主题和分区都是提供给上层用户的抽象,而在副本层面或更确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。

此外我们还可以从zookeeper客户端中获取。当创建一个主题时,会在/brokers/topics/目录下创建一个同名节点,该节点记录了该主题的分区副本分配方案:

[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/topic-create

image-20210209094436703

图中的"1":[0]表示分区1分配一个副本,在brokerId为0的broker节点中。

使用--describe查看分区副本的分配细节:

bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --describe --topic topic-create

image-20210209095059600

此外还可以通过replica-assignmemt参数来手动指定分区副本的分配方案:这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用逗号","隔开,分区内多个副本用":"隔开,并且在使用replica-assignment参数创建主题时不需要原本必备的partitions和replication-factor这两个参数。

--replica-assignmemt <String:

broker_id_for_part1_replica1 : broker_id_for_part1_replica2 : ...,

broker_id_for_part2_replica1 : broker_id_for_part2_replica2 : ...,

...

>

下面演示创建一个与主题topic-create相同的分配方案的主题topic-create-assign:

bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --create --topic topic-create-assign --replica-assignment 0,0,0,0
  • 注意同一个分区内的副本不能有重复,比如指定了0:0,1:1这种,就会报出异常。

  • 分区之间指定的副本数不同,比如0:1,0,1:0这种,也会报出异常。

  • 这种0:1,,0:1,1:0,企图跳过一个分区的行为也是不允许的。

创建主题时,我们还可以通过config参数来设置所要创建主题的相关参数,以覆盖默认配置

--config <String:name1=value1> --config <String:name2=value2>

默认情况下,如果创建一个已存在的主题,会直接抛出异常。但是可以添加一个参数--if-not-exists,那么发生命名冲突后将不做任何处理(不报错,也不创建主题),如果没有发生命名冲突,那么和不带--if-not-exists参数的行为一样正常创建主题。

注意:使用kafka-topics.sh创建主题时还会检测是否包含"."或""字符。kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将"."改成下划线"",如果先命名了topic_1.2,再次创建topic.1_2主题就会抛出InvalidTopicException异常,如下图

image-20210209104711942

kafka从0.10.x版本开始支持broker的机架信息,通过broker端broker.rack参数配置,如果指定了机架信息,则在分区副本分配时会尽可能让分区副本分配到不同的机架上。

如果一个集群中有部分broker指定了机架信息,并且其余broker没有指定broker机架信息,会直接抛出异常。这时想要创建主题,要么集群所有broker都加上机架信息或者都去掉机架信息,要么使用--disable-rack-aware参数来忽略机架信息。

代码创建主题(可能缺少一部分依赖,自己补全就行):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.5</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.5</version>
</dependency>

java的代码:

    public static void main(String[] args) {
        String[] options = new String[]{
                "--zookeeper", "192.168.1.51:2181",
                "--create",
                "--topic", "topicCommand-test",
                "--partitions", "2",
                "--replication-factor", "1"
        };
        TopicCommand.main(options);
    }

image-20210209111128113

查看主题

kafka-topics.sh脚本有5中指令类型:create、list、describe、alter、delete。

list和describe可以用来方便查看主题信息。

通过list指令可以查看当前所有可用的主题

bin/kafka-topics.sh --zookeeper localhost:2181 -list

通过describe指令查看单个(多个,全部)主题信息

#单个
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicName
#多个
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicName1,topicName
#全部
bin/kafka-topics.sh --zookeeper localhost:2181 --describe 

使用describe指令查看主题信息时还可以额外指定参数增加一些附加功能

  • --topics-with-overrides 找出所有包含覆盖配置的主题
  • --under-replicated-partitions 可以找出所有包含失效副本的分区
  • --unavailable-partitions 可以查看主题中没有leader副本的分区

修改主题

当主题被创建之后,依然允许我们对其做一定修改,例如修改分区个数、修改配置等。使用alter指令。

增加主题的分区数:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --partitions 3

image-20210209161510889

修改成功的警告信息:当主题中的消息包含key时,根据key计算分区的行为就会收到影响。所以增加分区需要三思而后行,建议一开始就设置好分区数。

如果修改的主题不存在,可以使用if-exists参数来忽略异常。

kafka不支持减少分区数,会直接抛出异常。

alter指令配合--config参数可以达到修改主题配置值

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --config max.message.bytes=20000

image-20210209162505742

我们可以通过--delete-config参数来删除之前覆盖的配置,使其恢复原有的默认值

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --delete-config max.message.bytes

image-20210209162723230

配置管理

kafka-configs.sh脚本是专门用来对配置进行操作的(推荐使用,不推荐使用kafka-topics.sh,已过时)。kafka-configs.sh脚本包含变更配置alter和查看配置describe两种指令类型。此外,该脚本还支持操作broker、用户和客户端的配置。

该脚本使用--entity-type参数来指定操作配置的类型,并使用--entity-name来指定操作配置的名称。例如查看主题wj配置:

bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --describe --entity-type topics --entity-name wj

entity-type只可以配置4个值:topics,brokers,clients,users

entity-type和entity-name的对应关系如下表:

entity-type entity-name
topics 指定主题的名称
brokers 指定brokerId值,broker中的broker.id值
clients 指定clientId值,即Product或Consumer的client.id值
users 指定用户名

使用alter指令变更配置时,需要配合--add-config--delete-config这两个参数一起使用。--add-config用来实现配置的增改,即覆盖原有配置;--delete-config用于删除配置,恢复默认值。

bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --alter --entity-type topics --entity-name wj --add-config k1=v1,k2=v2
bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --alter --entity-type topics --entity-name wj --delete-config k1,k2

删除主题

kafka-topics.sh脚本中的delete指令可以用于删除主题。

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName

image-20210209165752123

我们直接执行删除指令,控制台提示必须将delete.topic.enable设置为true才能删除主题,这个参数默认值就是true,如果为false,那么删除主题的操作将会被忽略。

如果删除的主体是kafka的内部主题,删除会直接报错。

删除一个不存在主体也会报错。同alter指令一样,设置--if-exists参数可以忽略异常。

删除主体是一个不可逆的过程,一旦删除,与其相关的所以消息都会被删除

KafkaAdminClient

前面我们用TopicCommand创建了一个主体,当然我们可以用它实现主体的删除、修改、查看等操作,实质上与使用kafka-config.sh脚本的方式无异,交互性非常差。

所以kafka提供了KafkaAdminClient作为替代方案,继承于AdminClient,方法都是见名知意的。

下面演示少部分用法:

1.创建一个主题

String brokerList = "192.168.1.51:9092";
String topic = "topic-admin-test";

Properties prop = new Properties();
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
//构建KafkaAdminClient实例
AdminClient client = AdminClient.create(prop);
//设定所创建主题的具体信息
NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
//创建主题
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
    //等待服务端返回
    result.all().get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
client.close();//释放资源

2.查看主题配置

private static String brokerList = "192.168.1.51:9092";
private static String topic = "topic-admin-test";

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties prop = new Properties();
    prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    //构建KafkaAdminClient实例
    AdminClient client = AdminClient.create(prop);
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
    DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
    Config config = result.all().get().get(resource);
    System.out.println(config);
    client.close();
}

最终的输出结果不会只列出被覆盖的配置信息,而是会列出主题中所有的配置信息。

3.增加分区

    private static String brokerList = "192.168.1.51:9092";
    private static String topic = "topic-admin-test";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties prop = new Properties();
        prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        //构建KafkaAdminClient实例
        AdminClient client = AdminClient.create(prop);
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
        newPartitionsMap.put(topic, newPartitions);
        CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
        result.all().get();
        client.close();
    }

标签:管理,--,topics,主题,kafka,topic,sh,Kafka
来源: https://www.cnblogs.com/wwjj4811/p/14393906.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有