ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Zookeeper源码部分 第2章 2.5 Follower和Leader状态同步源码

2022-06-05 00:02:10  阅读:156  来源: 互联网

标签:qp LOG self Zookeeper Leader Follower 源码 new leader


2.5 Follower和Leader状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader更新自己状态为Leader,其他节点更新自己状态为Follower。

Leader更新状态入口:leader.lead()

Follower更新状态入口:follower.followerLeader()

​ 注意:

(1)follower必须要让leader知道自己的状态:epoch、zxid、sid

必须要找出谁是leader;

发起请求连接leader;

发送自己的信息给leader;

leader接收到信息,必须要返回对应的信息给follower。

(2)当leader得知follower的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP

(3)执行数据同步

(4)当leader接收到超过半数follower的ack之后,进入正常工作状态,集群启动完成了

最终总结同步的方式:

(1)DIFF咱两一样,不需要做什么

(2)TRUNC follower的zxid比leader的zxid大,所以Follower要回滚

(3)COMMIT leader的zxid比follower的zxid大,发送Proposal给foloower提交执行

(4)如果follower并没有任何数据,直接使用SNAP的方式来执行数据同步(直接把数据全部序列到follower)

image-20220604230458476 image-20220604230524409

2.5.1 Leader.lead()等待接收follower的状态同步申请

1)在Leader.java种查找lead()方法

void lead() throws IOException, InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
            QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

    try {
        self.tick.set(0);
		// 恢复数据到内存,启动时,其实已经加载过了
        zk.loadData();

        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // Start thread that waits for connection requests from
        // new followers.
		// 等待其他follower节点向leader节点发送同步状态
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();

        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

        … …
    } finally {
        zk.unregisterJMX(this);
    }
}
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
    private volatile boolean stop = false;

    public LearnerCnxAcceptor() {
        super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
                .getZooKeeperServerListener());
    }

    @Override
    public void run() {
        try {
            while (!stop) {
                Socket s = null;
                boolean error = false;
                try {
					// 等待接收follower的状态同步申请
                    s = ss.accept();

                    // start with the initLimit, once the ack is processed
                    // in LearnerHandler switch to the syncLimit
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
					// 一旦接收到follower的请求,就创建LearnerHandler对象,处理请求
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
					// 启动线程
                    fh.start();
                } catch (SocketException e) {
                    ... ...
                } 
				... ...
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e.getMessage());
            handleException(this.getName(), e);
        }
    }

    public void halt() {
        stop = true;
    }
}

其中ss的初始化是在创建Leader对象时,创建的socket

private final ServerSocket ss;

Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
    this.self = self;
    this.proposalStats = new BufferStats();
    try {
        if (self.shouldUsePortUnification() || self.isSslQuorum()) {
            boolean allowInsecureConnection = self.shouldUsePortUnification();
            if (self.getQuorumListenOnAllIPs()) {
                ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());
            } else {
                ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
            }
        } else {
            if (self.getQuorumListenOnAllIPs()) {
                ss = new ServerSocket(self.getQuorumAddress().getPort());
            } else {
                ss = new ServerSocket();
            }
        }
        ss.setReuseAddress(true);
        if (!self.getQuorumListenOnAllIPs()) {
            ss.bind(self.getQuorumAddress());
        }
    } catch (BindException e) {
        ... ...
    }
    this.zk = zk;
    this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
            maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}

2.5.2 Follower.lead()查找并连接Leader

1)在Follower.java种查找followLeader()方法

void followLeader() throws InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
            QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    try {
		// 查找leader
        QuorumServer leaderServer = findLeader();            
        try {
            // 连接leader
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            // 向leader注册
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            if (self.isReconfigStateChange())
               throw new Exception("learned about role change");
            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                        + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            syncWithLeader(newEpochZxid);                
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            try {
                sock.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        zk.unregisterJMX((Learner)this);
    }
}

/
 * Returns the address of the node we think is the leader.
 */
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
	// 选举投票的时候记录的,最后推荐的leader的sid
    Vote current = self.getCurrentVote();
	
	// 如果这个sid在启动的所有服务器范围中
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
			// 尝试连接leader的正确IP地址
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return leaderServer;
}


protected void connectToLeader(InetSocketAddress addr, String hostname)
        throws IOException, InterruptedException, X509Exception {
    this.sock = createSocket();

    int initLimitTime = self.tickTime * self.initLimit;
    int remainingInitLimitTime = initLimitTime;
    long startNanoTime = nanoTime();

    for (int tries = 0; tries < 5; tries++) {
        try {
            // recalculate the init limit time because retries sleep for 1000 milliseconds
            remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
            if (remainingInitLimitTime <= 0) {
                LOG.error("initLimit exceeded on retries.");
                throw new IOException("initLimit exceeded on retries.");
            }

            sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
            if (self.isSslQuorum())  {
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            ... ...
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

2.5.3 Leader.lead()创建LearnerHandler

void lead() throws IOException, InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
            QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

    try {
        self.tick.set(0);
        zk.loadData();

        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // Start thread that waits for connection requests from
        // new followers.
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();

        ......
    } finally {
        zk.unregisterJMX(this);
    }
}

class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
    private volatile boolean stop = false;
    ... ...

    @Override
    public void run() {
        try {
            while (!stop) {
                Socket s = null;
                boolean error = false;
                try {
                    s = ss.accept();

                    // start with the initLimit, once the ack is processed
                    // in LearnerHandler switch to the syncLimit
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    ... ...
				}
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e.getMessage());
            handleException(this.getName(), e);
        }
    }

    public void halt() {
        stop = true;
    }
}
由于public class LearnerHandler extends ZooKeeperThread{},说明LearnerHandler是一个线程。所以fh.start()执行的是LearnerHandler中的run()方法。
public void run() {
    try {
        leader.addLearnerHandler(this);
		// 心跳处理
        tickOfNextAckDeadline = leader.self.tick.get()
                + leader.self.initLimit + leader.self.syncLimit;

        ia = BinaryInputArchive.getArchive(bufferedInput);
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);

		// 从网络中接收消息,并反序列化为packet
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

		// 选举结束后,observer和follower都应该给leader发送一个标志信息:FOLLOWERINFO或者OBSERVERINFO
        if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
            LOG.error("First packet " + qp.toString()
                    + " is not FOLLOWERINFO or OBSERVERINFO!");
            return;
        }

        byte learnerInfoData[] = qp.getData();
        if (learnerInfoData != null) {
            ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            if (learnerInfoData.length >= 8) {
                this.sid = bbsid.getLong();
            }
            if (learnerInfoData.length >= 12) {
                this.version = bbsid.getInt(); // protocolVersion
            }
            if (learnerInfoData.length >= 20) {
                long configVersion = bbsid.getLong();
                if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            }
        } else {
            this.sid = leader.followerCounter.getAndDecrement();
        }

        if (leader.self.getView().containsKey(this.sid)) {
            LOG.info("Follower sid: " + this.sid + " : info : "
                    + leader.self.getView().get(this.sid).toString());
        } else {
            LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
        }
                    
        if (qp.getType() == Leader.OBSERVERINFO) {
              learnerType = LearnerType.OBSERVER;
        }
		// 读取Follower发送过来的lastAcceptedEpoch
		// 选举过程中,所使用的epoch,其实还是上一任leader的epoch
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

        long peerLastZxid;
        StateSummary ss = null;
		
		// 读取follower发送过来的zxid
        long zxid = qp.getZxid();
		
		// Leader根据从Follower获取sid和旧的epoch,构建新的epoch 
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
			// Leader向Follower发送信息(包含:zxid和newEpoch)
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
			
            bufferedOutput.flush();
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error(ackEpochPacket.toString()
                        + " is not ACKEPOCH");
                return;
			}
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            leader.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();
       
        // Take any necessary action if we need to send TRUNC or DIFF
        // startForwarding() will be called in all cases
        boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
        
        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
            LearnerSnapshot snapshot = 
                    leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
            try {
                long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                bufferedOutput.flush();

                LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                        + "send zxid of db as 0x{}, {} concurrent snapshots, " 
                        + "snapshot was {} from throttle",
                        Long.toHexString(peerLastZxid), 
                        Long.toHexString(leaderLastZxid),
                        Long.toHexString(zxidToSend), 
                        snapshot.getConcurrentSnapshotNumber(),
                        snapshot.isEssential() ? "exempt" : "not exempt");
                // Dump data to peer
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                snapshot.close();
            }
        }

        LOG.debug("Sending NEWLEADER message to " + sid);
        // the version of this quorumVerifier will be set by leader.lead() in case
        // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
        // we got here, so the version was set
        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                            .toString().getBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();

        // Start thread that blast packets in the queue to learner
        startSendingPackets();
        
        /*
         * Have to wait for the first ACK, wait until
         * the leader is ready, and only then we can
         * start processing messages.
         */
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        if(qp.getType() != Leader.ACK){
            LOG.error("Next packet was supposed to be an ACK,"
                + " but received packet: {}", packetToString(qp));
            return;
        }

        if(LOG.isDebugEnabled()){
        	LOG.debug("Received NEWLEADER-ACK message from " + sid);   
        }
        leader.waitForNewLeaderAck(getSid(), qp.getZxid());

        syncLimitCheck.start();
        
        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

        /*
         * Wait until leader starts up
         */
        synchronized(leader.zk){
            while(!leader.zk.isRunning() && !this.isInterrupted()){
                leader.zk.wait(20);
            }
        }
        // Mutation packets will be queued during the serialize,
        // so we need to mark when the peer can actually start
        // using the data
        //
        LOG.debug("Sending UPTODATE message to " + sid);      
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
            if (qp.getType() == Leader.PING) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
            }
            tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


            ByteBuffer bb;
            long sessionId;
            int cxid;
            int type;

            switch (qp.getType()) {
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received ACK from Observer  " + this.sid);
                    }
                }
                syncLimitCheck.updateAck(qp.getZxid());
                leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
            case Leader.PING:
                // Process the touches
                ByteArrayInputStream bis = new ByteArrayInputStream(qp
                        .getData());
                DataInputStream dis = new DataInputStream(bis);
                while (dis.available() > 0) {
                    long sess = dis.readLong();
                    int to = dis.readInt();
                    leader.zk.touch(sess, to);
                }
                break;
            case Leader.REVALIDATE:
                bis = new ByteArrayInputStream(qp.getData());
                dis = new DataInputStream(bis);
                long id = dis.readLong();
                int to = dis.readInt();
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                dos.writeLong(id);
                boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
                if (valid) {
                    try {
                        //set the session owner
                        // as the follower that
                        // owns the session
                        leader.zk.setOwner(id, this);
                    } catch (SessionExpiredException e) {
                        LOG.error("Somehow session " + Long.toHexString(id) +
                                " expired right after being renewed! (impossible)", e);
                    }
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                                             ZooTrace.SESSION_TRACE_MASK,
                                             "Session 0x" + Long.toHexString(id)
                                             + " is valid: "+ valid);
                }
                dos.writeBoolean(valid);
                qp.setData(bos.toByteArray());
                queuedPackets.add(qp);
                break;
            case Leader.REQUEST:
                bb = ByteBuffer.wrap(qp.getData());
                sessionId = bb.getLong();
                cxid = bb.getInt();
                type = bb.getInt();
                bb = bb.slice();
                Request si;
                if(type == OpCode.sync){
                    si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                } else {
                    si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                }
                si.setOwner(this);
                leader.zk.submitLearnerRequest(si);
                break;
            default:
                LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                break;
            }
        }
    } catch (IOException e) {
        ... ...
    } finally {
        ... ...
    }
}

2.5.4 Follower.lead()创建registerWithLeader

void followLeader() throws InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
            QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    try {
        // 查找leader
        QuorumServer leaderServer = findLeader(); 

        try {
			// 连接leader
            connectToLeader(leaderServer.addr, leaderServer.hostname);

			// 向leader注册
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
			
            if (self.isReconfigStateChange())
               throw new Exception("learned about role change");
            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                        + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            syncWithLeader(newEpochZxid);                
            QuorumPacket qp = new QuorumPacket();
			// 循环等待接收消息
            while (this.isRunning()) {
				// 读取packet信息
                readPacket(qp);
				// 处理packet消息
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            try {
                sock.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        zk.unregisterJMX((Learner)this);
    }
}

protected long registerWithLeader(int pktType) throws IOException{
    /*
     * Send follower info, including last zxid and sid
     */
	long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /*
     * Add sid to payload
     */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    
	// 发送FollowerInfo给Leader
    writePacket(qp, true);
	
	// 读取Leader返回的结果:LeaderInfo
    readPacket(qp);    
    
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	// 如果接收到LeaderInfo
	if (qp.getType() == Leader.LEADERINFO) {
    	// we are connected to a 1.0 server so accept the new epoch and read the next packet
    	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    	byte epochBytes[] = new byte[4];
    	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
		
		// 接收leader的epoch
    	if (newEpoch > self.getAcceptedEpoch()) {
			// 把自己原来的epoch保存在wrappedEpochBytes里
    		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
			// 把leader发送过来的epoch保存起来
    		self.setAcceptedEpoch(newEpoch);
    	} else if (newEpoch == self.getAcceptedEpoch()) {
    		// since we have already acked an epoch equal to the leaders, we cannot ack
    		// again, but we still need to send our lastZxid to the leader so that we can
    		// sync with it if it does assume leadership of the epoch.
    		// the -1 indicates that this reply should not count as an ack for the new epoch
            wrappedEpochBytes.putInt(-1);
    	} else {
    		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
    	}
		// 发送ackepoch给leader(包含了自己的:epoch和zxid)
    	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    	writePacket(ackNewEpoch, true);
		
        return ZxidUtils.makeZxid(newEpoch, 0);
    } else {
    	if (newEpoch > self.getAcceptedEpoch()) {
    		self.setAcceptedEpoch(newEpoch);
    	}
        if (qp.getType() != Leader.NEWLEADER) {
            LOG.error("First packet should have been NEWLEADER");
            throw new IOException("First packet should have been NEWLEADER");
        }
        return qp.getZxid();
    }
}

2.5.5 Leader.lead()接收Follwer状态,根据同步方式发送同步消息

public void run() {
    try {
        leader.addLearnerHandler(this);
		// 心跳处理
        tickOfNextAckDeadline = leader.self.tick.get()
                + leader.self.initLimit + leader.self.syncLimit;

        ia = BinaryInputArchive.getArchive(bufferedInput);
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);

		// 从网络中接收消息,并反序列化为packet
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
		// 选举结束后,observer和follower都应该给leader发送一个标志信息:FOLLOWERINFO 或者OBSERVERINFO
        if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
            LOG.error("First packet " + qp.toString()
                    + " is not FOLLOWERINFO or OBSERVERINFO!");
            return;
        }

        byte learnerInfoData[] = qp.getData();
        if (learnerInfoData != null) {
            ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            if (learnerInfoData.length >= 8) {
                this.sid = bbsid.getLong();
            }
            if (learnerInfoData.length >= 12) {
                this.version = bbsid.getInt(); // protocolVersion
            }
            if (learnerInfoData.length >= 20) {
                long configVersion = bbsid.getLong();
                if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            }
        } else {
            this.sid = leader.followerCounter.getAndDecrement();
        }

        if (leader.self.getView().containsKey(this.sid)) {
            LOG.info("Follower sid: " + this.sid + " : info : "
                    + leader.self.getView().get(this.sid).toString());
        } else {
            LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
        }
                    
        if (qp.getType() == Leader.OBSERVERINFO) {
              learnerType = LearnerType.OBSERVER;
        }
		// 读取Follower发送过来的lastAcceptedEpoch
		// 选举过程中,所使用的epoch,其实还是上一任leader的epoch
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

        long peerLastZxid;
        StateSummary ss = null;
		
		// 读取follower发送过来的zxid
        long zxid = qp.getZxid();
		
		// 获取leader的最新epoch
		// 新的leader会构建一个新的epoch
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
			
			// Leader向Follower发送信息(包含:zxid和newEpoch)
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
			
            bufferedOutput.flush();
			// 接收到Follower应答的ackepoch
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
			
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error(ackEpochPacket.toString()
                        + " is not ACKEPOCH");
                return;
			}
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
			
			// 保存了对方follower或者observer的状态:epoch和zxid
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
			
            leader.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();
       
        // Take any necessary action if we need to send TRUNC or DIFF
        // startForwarding() will be called in all cases
		// 方法判断Leader和Follower是否需要同步
        boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
        
        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
            LearnerSnapshot snapshot = 
                    leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
            try {
                long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                bufferedOutput.flush();

                … …
                // Dump data to peer
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                snapshot.close();
            }
        }

        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        … …

        while (true) {
            … …
        }
    } catch (IOException e) {
        ... ...
    } finally {
        ... ...
    }
}


public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
    /*
     * When leader election is completed, the leader will set its
     * lastProcessedZxid to be (epoch < 32). There will be no txn associated
     * with this zxid.
     *
     * The learner will set its lastProcessedZxid to the same value if
     * it get DIFF or SNAP from the leader. If the same learner come
     * back to sync with leader using this zxid, we will never find this
     * zxid in our history. In this case, we will ignore TRUNC logic and
     * always send DIFF if we have old enough history
     */
    boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
    // Keep track of the latest zxid which already queued
    long currentZxid = peerLastZxid;
    boolean needSnap = true;
    boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
    ReentrantReadWriteLock lock = db.getLogLock();
    ReadLock rl = lock.readLock();
    try {
        rl.lock();
        long maxCommittedLog = db.getmaxCommittedLog();
        long minCommittedLog = db.getminCommittedLog();
        long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

        LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
                + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                + " peerLastZxid=0x{}", getSid(),
                Long.toHexString(maxCommittedLog),
                Long.toHexString(minCommittedLog),
                Long.toHexString(lastProcessedZxid),
                Long.toHexString(peerLastZxid));

        if (db.getCommittedLog().isEmpty()) {
            /*
             * It is possible that committedLog is empty. In that case
             * setting these value to the latest txn in leader db
             * will reduce the case that we need to handle
             *
             * Here is how each case handle by the if block below
             * 1. lastProcessZxid == peerZxid -> Handle by (2)
             * 2. lastProcessZxid < peerZxid -> Handle by (3)
             * 3. lastProcessZxid > peerZxid -> Handle by (5)
             */
            minCommittedLog = lastProcessedZxid;
            maxCommittedLog = lastProcessedZxid;
        }

        /*
         * Here are the cases that we want to handle
         *
         * 1. Force sending snapshot (for testing purpose)
         * 2. Peer and leader is already sync, send empty diff
         * 3. Follower has txn that we haven't seen. This may be old leader
         *    so we need to send TRUNC. However, if peer has newEpochZxid,
         *    we cannot send TRUNC since the follower has no txnlog
         * 4. Follower is within committedLog range or already in-sync.
         *    We may need to send DIFF or TRUNC depending on follower's zxid
         *    We always send empty DIFF if follower is already in-sync
         * 5. Follower missed the committedLog. We will try to use on-disk
         *    txnlog + committedLog to sync with follower. If that fail,
         *    we will send snapshot
         */

        if (forceSnapSync) {
            // Force leader to use snapshot to sync with follower
            LOG.warn("Forcing snapshot sync - should not see this in production");
        } else if (lastProcessedZxid == peerLastZxid) {
            // Follower is already sync with us, send empty diff
            LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
                     " for peer sid: " +  getSid());
            queueOpPacket(Leader.DIFF, peerLastZxid);
            needOpPacket = false;
            needSnap = false;
        } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
            // Newer than committedLog, send trunc and done
            LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
                      Long.toHexString(maxCommittedLog) +
                      " for peer sid:" +  getSid());
            queueOpPacket(Leader.TRUNC, maxCommittedLog);
            currentZxid = maxCommittedLog;
            needOpPacket = false;
            needSnap = false;
        } else if ((maxCommittedLog >= peerLastZxid)
                && (minCommittedLog <= peerLastZxid)) {
            // Follower is within commitLog range
            LOG.info("Using committedLog for peer sid: " +  getSid());
            Iterator<Proposal> itr = db.getCommittedLog().iterator();
            currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                                 null, maxCommittedLog);
            needSnap = false;
        } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
            // Use txnlog and committedLog to sync

            // Calculate sizeLimit that we allow to retrieve txnlog from disk
            long sizeLimit = db.calculateTxnLogSizeLimit();
            // This method can return empty iterator if the requested zxid
            // is older than on-disk txnlog
            Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
                    peerLastZxid, sizeLimit);
            if (txnLogItr.hasNext()) {
                LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
                currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                     minCommittedLog, maxCommittedLog);

                LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
                Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
                currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                     null, maxCommittedLog);
                needSnap = false;
            }
            // closing the resources
            if (txnLogItr instanceof TxnLogProposalIterator) {
                TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                txnProposalItr.close();
            }
        } else {
            LOG.warn("Unhandled scenario for peer sid: " +  getSid());
        }
        LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
                  " for peer sid: " +  getSid());
        leaderLastZxid = leader.startForwarding(this, currentZxid);
    } finally {
        rl.unlock();
    }

    if (needOpPacket && !needSnap) {
        // This should never happen, but we should fall back to sending
        // snapshot just in case.
        LOG.error("Unhandled scenario for peer sid: " +  getSid() +
                 " fall back to use snapshot");
        needSnap = true;
    }

    return needSnap;
}

2.5.6 Follower.lead()应答Leader同步结果

protected void processPacket(QuorumPacket qp) throws Exception{
    switch (qp.getType()) {
    case Leader.PING:            
        ping(qp);            
        break;
    case Leader.PROPOSAL:           
        TxnHeader hdr = new TxnHeader();
        Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
        if (hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                    + Long.toHexString(hdr.getZxid())
                    + " expected 0x"
                    + Long.toHexString(lastQueued + 1));
        }
        lastQueued = hdr.getZxid();
        
        if (hdr.getType() == OpCode.reconfig){
           SetDataTxn setDataTxn = (SetDataTxn) txn;       
           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
           self.setLastSeenQuorumVerifier(qv, true);                               
        }
        
        fzk.logRequest(hdr, txn);
        break;
    case Leader.COMMIT:
        fzk.commit(qp.getZxid());
        break;
        
    … …
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Follower started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        fzk.sync();
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

public void commit(long zxid) {
    if (pendingTxns.size() == 0) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                + " without seeing txn");
        return;
    }
    long firstElementZxid = pendingTxns.element().zxid;
    if (firstElementZxid != zxid) {
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                + " but next pending txn 0x"
                + Long.toHexString(firstElementZxid));
        System.exit(12);
    }
    Request request = pendingTxns.remove();
    commitProcessor.commit(request);
}

2.5.7 Leader.lead()应答Follower

由于public class LearnerHandler extends ZooKeeperThread{},说明LearnerHandler是一个线程。所以fh.start()执行的是LearnerHandler中的run()方法。

public void run() {
    … …
        //
        LOG.debug("Sending UPTODATE message to " + sid);      
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            … …
        }
    } catch (IOException e) {
        ... ...
    } finally {
        ... ...
    }
}

标签:qp,LOG,self,Zookeeper,Leader,Follower,源码,new,leader
来源: https://www.cnblogs.com/niuniu2022/p/16343115.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有