ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

RocketMQ(一) - NameServer 启动源码分析

2022-02-19 03:00:45  阅读:212  来源: 互联网

标签:namesrvConfig private controller 源码 nettyServerConfig NamesrvController new Name


RocketMQ(一) - NameServer 启动源码分析

NameServer 的定义以及用处,本篇文章就不做介绍了,此文章主要分析其源码。

1. 入口

NamesrvStartup 是的NameServer服务的启动类。 其入口是 main0( ) 方法。

    public static NamesrvController main0(String[] args) {

        try {

            // 创建 namesrv 控制器 (nameServer核心类)
            NamesrvController controller = createNamesrvController(args);

            // 启动nameSrv
            start(controller);

            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }

上述代码非常简单, 其功能就是:

  1. 创建 NamesrvController 对象 createNamesrvController(args)
  2. 启动 NameServer start(controller);

下面分别 根据这两个方法 来做入口分析。

2. NamesrvController

在分析如何创建 NamesrvController 对象之前, 先来介绍下该类

上面是 NamesrvController 类的简图, 可以结合此图来看下面的属性分析:

属性:

    //  日志
	private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
	
	// NameSrv的配置信息类   
    private final NamesrvConfig namesrvConfig;
    
	// NameSrv底层的核心服务器 - Netty配置类
    private final NettyServerConfig nettyServerConfig;


    // 调度线程池, 执行定时任务, 两件事 : 1.检查存活的broker状态 2.打印配置
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));

    // 管理 NameServer的 kv配置
    private final KVConfigManager kvConfigManager;

    // 管理 路由信息的对象   ***重要***
    private final RouteInfoManager routeInfoManager;

    // Netty网络层封装对象  ***重要***
    private RemotingServer remotingServer;

    // ChannelEventListener接口类   用于监听channel 状态,
    // 当channel状态发生改变时 例如: close idle ... 等状态时 ,该service会监听并处理
    private BrokerHousekeepingService brokerHousekeepingService;

    // 业务线程池,netty线程主要任务 是 解析报文, 将报文解析成 RemotingCommand对象, 然后就将该对象交给业务线程池继续处理
    private ExecutorService remotingExecutor;
	
	// 总配置 (其中包含 NameSrv配置 和  NettyServer配置)
    private Configuration configuration;
    private FileWatchService fileWatchService;

构造方法:

    // 参数1: namesrvConfig
    // 参数2: 网络层配置 nettyServerConfig
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;

        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();

        this.brokerHousekeepingService = new BrokerHousekeepingService(this);

        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

上述代码不用多说, 就是简单的 创建对象赋值操作。 由于本篇重点分析的是 NameServer的启动流程,因此我们重点关注 nameSrvConfig , 至于其他的对象会在之后的文章中介绍。

3.NamesrvConfig

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    //  从环境变量中 获取 ROCKETMQ_HOME 值
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));

    // kvConfig.json  路径  kv的存储文件路径
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

    // namesrv.properties 配置文件路径
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;

    // 顺序消息功能 是否开启  默认关闭
    private boolean orderMessageEnable = false;

 	// ....  省略 get/set 方法   
}

实际上该类也没什么好说的, 核心就是保存一些 NameServer的配置文件路径 和一些关键值。

4. 创建控制器

介绍完了 NameSrvController ,重回到入口的 createNamesrvController(args),看看是如何 创建 NameSrvController 对象的。

    /**
     * 创建 namesrv 控制器
     * @param args  启动参数
     */
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

        Options options = ServerUtil.buildCommandlineOptions(new Options());

        // 启动时的参数信息, 由commandLine 管理
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }


        // nameSrvConfig  nameSrv配置
        final NamesrvConfig namesrvConfig = new NamesrvConfig();

        // netty 服务器配置
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();


        // namesrv 服务器 监听端口 修改为 9876
        nettyServerConfig.setListenPort(9876);


        if (commandLine.hasOption('c')) {
            //  读取 -c 选项值
            String file = commandLine.getOptionValue('c');
            if (file != null) {

                // 读取 config 文件数据 到 properties 内
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                // 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 字段进行复写
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                // 将读取的 配置文件 路径 保存到 字段
                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        // 将启动时 命令行 设置的 kv 复写到 namesrvConfig 内
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        // 创建日志对象。
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);


        // 创建 控制器
        // 参数1: namesrvConfig
        // 参数2: 网络层配置 nettyServerConfig
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

上述代码虽然很长, 实际上就是 读取系统环境变量 和 启动参数 来设置 NameSrvConfigNettyServeConfig 配置类, 并通过这俩类构造出 NamesrvController对象。

5. 启动控制器

下面再来看下 是如何启动 NamesrvController 的。

    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        // 初始化方法...
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        // 注册 JVM级别的 Hook, 平滑关机的逻辑。 当JVM被关闭时,主动调用 controller.shutdown()方法平滑关机
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        // 服务器网络层 启动
        controller.start();

        return controller;
    }

上述代码主要 做了 三件事:

  1. 初始化
  2. 注册 JVM Hook 回调平滑关机。
  3. 启动

重点需要关注 第 1点 初始化 , 而第3点的逻辑实际上就是 NettyServer的启动入口了。

6. 初始化控制器

    /**
     *  初始化 NameSrv 控制器
     * @return
     */
    public boolean initialize() {

        // 加载本地 kv 配置
        this.kvConfigManager.load();

        // 创建 网络服务器 对象
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 创建业务线程池  默认线程数 8
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注册协议处理器 (缺省协议处理器)
        this.registerProcessor();

        // 定时任务1: 每10s中检查 broker 存活状态, 将idle状态的broker移除
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);


        // 定时任务2: 每10min中, 打印一遍 kv配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        
        // ... 省略 SSL证书的操作 (不在重点关注范围内)

        return true;
    }

上述代码主要做了如下几件事:

  1. 加载NameServer配置信息到 KV配置管理中心
  2. 创建NettyServer 对象
  3. 创建 Netty的 业务线程池 remotingExecutor
  4. 缺省协议处理器 DefaultRequestProcessor 注册到NettyServer对象中
  5. 分别开启 两个 定时任务:
  6. 检查 broker 存活状态
  7. 打印 kv配置信息

标签:namesrvConfig,private,controller,源码,nettyServerConfig,NamesrvController,new,Name
来源: https://www.cnblogs.com/s686zhou/p/15911797.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有