ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Zookeeper源码学习记录

2021-05-16 17:03:46  阅读:197  来源: 互联网

标签:info zxid LOG 记录 Zookeeper try epoch 源码 catch


Zookeeper源码学习记录

知识点记录:

1、TimerTask 定时器的使用

static class PurgeTask extends TimerTask {

        private File logsDir;
        private File snapsDir;
        private int snapRetainCount;

        public PurgeTask(File dataDir, File snapDir, int count) {
            logsDir = dataDir;
            snapsDir = snapDir;
            snapRetainCount = count;
        }

        @Override
        public void run() {
            LOG.info("Purge task started.");
            try {
                PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
            } catch (Exception e) {
                LOG.error("Error occurred while purging.", e);
            }
            LOG.info("Purge task completed.");
        }

    }

 

2、confg文件的读取

Properties类的使用

3、ZK集群启动 QuorumPeerMain.java

集群模式启动:runFromConfig(config);

单机模式启动:ZooKeeperServerMain.main(args);

 

4、ServerCnxnFactory

https://blog.csdn.net/jpf254/article/details/80787018

4.1、创建

cnxnFactory = ServerCnxnFactory.createFactory();

cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);

4.2、启动

startServerCnxnFactory();

 

5、ZooKeeperThread.java

public ZooKeeperThread(String threadName) {
        super(threadName);
        setUncaughtExceptionHandler(uncaughtExceptionalHandler);
}

6、QuorumPeer

This class manages the quorum protocol.

 

 

 

7、startLeaderElection()

//构建选票,并确定选举算法

真正的选举过程是FastLeaderElection.java中的lookForLeader();

 

选票比较规则:

/*
 * We return true if one of the following three cases hold:
 * 1- New epoch is higher
 * 2- New epoch is the same as current epoch, but new zxid is higher
 * 3- New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher.
 */
/**
     * Check if a pair (server id, zxid) succeeds our current vote.
     * 比较选票的大小
     */
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug(
                "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
                newId,
                curId,
                Long.toHexString(newZxid),
                Long.toHexString(curZxid));

        if (self.getQuorumVerifier().getWeight(newId) == 0) {
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch)
                || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                || ((newZxid == curZxid)
                && (newId > curId)))));
    }

 

 

在public void run() {} 中的

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                    case LOOKING:
                        LOG.info("LOOKING");
                        ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                        if (Boolean.getBoolean("readonlymode.enabled")) {
                            LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                            // Create read-only server but don't start it immediately
                            final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

                            // Instead of starting roZk immediately, wait some grace
                            // period before we decide we're partitioned.
                            //
                            // Thread is used here because otherwise it would require
                            // changes in each of election strategy classes which is
                            // unnecessary code coupling.
                            Thread roZkMgr = new Thread() {
                                public void run() {
                                    try {
                                        // lower-bound grace period to 2 secs
                                        sleep(Math.max(2000, tickTime));
                                        if (ServerState.LOOKING.equals(getPeerState())) {
                                            roZk.startup();
                                        }
                                    } catch (InterruptedException e) {
                                        LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                    } catch (Exception e) {
                                        LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                    }
                                }
                            };
                            try {
                                roZkMgr.start();
                                reconfigFlagClear();
                                if (shuttingDownLE) {
                                    shuttingDownLE = false;
                                    startLeaderElection();
                                }
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            } finally {
                                // If the thread is in the the grace period, interrupt
                                // to come out of waiting.
                                roZkMgr.interrupt();
                                roZk.shutdown();
                            }
                        } else {
                            try {
                                reconfigFlagClear();
                                if (shuttingDownLE) {
                                    shuttingDownLE = false;
                                    startLeaderElection();
                                }
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            }
                        }
                        break;
                    case OBSERVING:
                        try {
                            LOG.info("OBSERVING");
                            setObserver(makeObserver(logFactory));
                            observer.observeLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            observer.shutdown();
                            setObserver(null);
                            updateServerState();

                            // Add delay jitter before we switch to LOOKING
                            // state to reduce the load of ObserverMaster
                            if (isRunning()) {
                                Observer.waitForObserverElectionDelay();
                            }
                        }
                        break;
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            updateServerState();
                        }
                        break;
                    case LEADING:
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            updateServerState();
                        }
                        break;
                }
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            MBeanRegistry instance = MBeanRegistry.getInstance();
            instance.unregister(jmxQuorumBean);
            instance.unregister(jmxLocalPeerBean);

            for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                instance.unregister(remotePeerBean);
            }

            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
            jmxRemotePeerBean = null;
        }

 

 

8、leader.lead();

 

 

 

标签:info,zxid,LOG,记录,Zookeeper,try,epoch,源码,catch
来源: https://blog.csdn.net/B_e_a_u_tiful1205/article/details/115765447

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有