ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Zookeeper源码部分 第2章 ZK服务端加载数据源码解析

2022-06-05 00:02:20  阅读:158  来源: 互联网

标签:ZK Zookeeper header 源码 quorumPeer rc new path config


2.3 ZK服务端加载数据源码解析

(1)zk中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode

(2)zk集群中的DataTree时刻保持状态同步

(3)Zookeeper集群中每个zk节点中,数据在内存和磁盘中都有一份完整的数据。

  • 内存数据:DataTree

  • 磁盘数据:快照文件 + 编辑日志

image-20220604224545941

2.3.1 冷启动数据恢复快照数据

1)启动集群

public void runFromConfig(QuorumPeerConfig config)
        throws IOException, AdminServerException
{
  … …

  LOG.info("Starting quorum peer");
  try {
      ServerCnxnFactory cnxnFactory = null;
      ServerCnxnFactory secureCnxnFactory = null;

	  // 通信组件初始化,默认是NIO通信
      if (config.getClientPortAddress() != null) {
          cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                  config.getMaxClientCnxns(), false);
      }

      if (config.getSecureClientPortAddress() != null) {
          secureCnxnFactory = ServerCnxnFactory.createFactory();
          secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                  config.getMaxClientCnxns(), true);
      }

	  // 把解析的参数赋值给该Zookeeper节点
      quorumPeer = getQuorumPeer();
      quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  config.getDataLogDir(),
                  config.getDataDir()));
      quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
      quorumPeer.enableLocalSessionsUpgrading(
          config.isLocalSessionsUpgradingEnabled());
      //quorumPeer.setQuorumPeers(config.getAllMembers());
      quorumPeer.setElectionType(config.getElectionAlg());
      quorumPeer.setMyid(config.getServerId());
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      quorumPeer.setInitLimit(config.getInitLimit());
      quorumPeer.setSyncLimit(config.getSyncLimit());
      quorumPeer.setConfigFileName(config.getConfigFilename());
      // 管理zk数据的存储
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
      if (config.getLastSeenQuorumVerifier()!=null) {
          quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
    }
      quorumPeer.initConfigInZKDatabase();
      // 管理zk的通信
      quorumPeer.setCnxnFactory(cnxnFactory);
      quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
      quorumPeer.setSslQuorum(config.isSslQuorum());
      quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
      quorumPeer.setLearnerType(config.getPeerType());
      quorumPeer.setSyncEnabled(config.getSyncEnabled());
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      if (config.sslQuorumReloadCertFiles) {
          quorumPeer.getX509Util().enableCertFileReloading();
      }

      quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
      quorumPeer.initialize();
      
	  // 启动zk
      quorumPeer.start();
      quorumPeer.join();
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}

2)冷启动恢复数据

QuorumPeer.java

public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
	// 冷启动数据恢复
    loadDataBase(); 
    startServerCnxnFactory();
    try {
		// 启动通信工厂实例对象
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
	// 准备选举环境
    startLeaderElection();
	// 执行选举
    super.start();
}

private void loadDataBase() {
    try {
		// 加载磁盘数据到内存,恢复DataTree
		// zk的操作分两种:事务操作和非事务操作
		// 事务操作:zk.cteate();都会被分配一个全局唯一的zxid,zxid组成:64位: (前32位:epoch每个leader任期的代号;后32位:txid为事务id)
		// 非事务操作:zk.getData()
		
		// 数据恢复过程:
		// (1)从快照文件中恢复大部分数据,并得到一个lastProcessZXid
		// (2)再从编辑日志中执行replay,执行到最后一条日志并更新lastProcessZXid
		// (3)最终得到,datatree和lastProcessZXid,表示数据恢复完成
        zkDb.loadDataBase();

        // load the epochs
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch(FileNotFoundException e) {
        	// pick a reasonable epoch number
        	// this should only happen once when moving to a
        	// new code version
        	currentEpoch = epochOfZxid;
        	LOG.info(CURRENT_EPOCH_FILENAME
        	        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
        	        currentEpoch);
        	writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
        }
        try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch(FileNotFoundException e) {
        	// pick a reasonable epoch number
        	// this should only happen once when moving to a
        	// new code version
        	acceptedEpoch = epochOfZxid;
        	LOG.info(ACCEPTED_EPOCH_FILENAME
        	        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
        	        acceptedEpoch);
        	writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
        }
    } catch(IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}

public long loadDataBase() throws IOException {
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    return zxid;
}

public long restore(DataTree dt, Map<Long, Integer> sessions,
                    PlayBackListener listener) throws IOException {
	// 恢复快照文件数据到DataTree
    long deserializeResult = snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);

    RestoreFinalizer finalizer = () -> {
		// 恢复编辑日志数据到DataTree
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        return highestZxid;
    };

    if (-1L == deserializeResult) {
        /* this means that we couldn't find any snapshot, so we need to
         * initialize an empty database (reported in ZOOKEEPER-2325) */
        if (txnLog.getLastLoggedZxid() != -1) {
            // ZOOKEEPER-3056: provides an escape hatch for users upgrading
            // from old versions of zookeeper (3.4.x, pre 3.5.3).
            if (!trustEmptySnapshot) {
                throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
            } else {
                LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
                return finalizer.run();
            }
        }
        /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
         *       or use Map on save() */
        save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
        /* return a zxid of zero, since we the database is empty */
        return 0;
    }

    return finalizer.run();
}
ctrl + alt +B 查找deserialize实现类FileSnap.java
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
        throws IOException {
    // we run through 100 snapshots (not all of them)
    // if we cannot get it running within 100 snapshots
    // we should  give up
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
boolean foundValid = false;

	// 依次遍历每一个快照的数据
    for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
        snap = snapList.get(i);
        LOG.info("Reading snapshot " + snap);
        // 反序列化环境准备
        try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
             CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
			
			// 反序列化,恢复数据到DataTree
            deserialize(dt, sessions, ia);
            long checkSum = crcIn.getChecksum().getValue();
            long val = ia.readLong("val");
            if (val != checkSum) {
                throw new IOException("CRC corruption in snapshot :  " + snap);
            }
            foundValid = true;
            break;
        } catch (IOException e) {
            LOG.warn("problem reading snap file " + snap, e);
        }
    }
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
    return dt.lastProcessedZxid;
}


public void deserialize(DataTree dt, Map<Long, Integer> sessions,
        InputArchive ia) throws IOException {
    FileHeader header = new FileHeader();
    header.deserialize(ia, "fileheader");
    if (header.getMagic() != SNAP_MAGIC) {
        throw new IOException("mismatching magic headers "
                + header.getMagic() +
                " !=  " + FileSnap.SNAP_MAGIC);
    }
	// 恢复快照数据到DataTree
    SerializeUtils.deserializeSnapshot(dt,ia,sessions);
}


public static void deserializeSnapshot(DataTree dt,InputArchive ia,
        Map<Long, Integer> sessions) throws IOException {
    int count = ia.readInt("count");
    while (count > 0) {
        long id = ia.readLong("id");
        int to = ia.readInt("timeout");
        sessions.put(id, to);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "loadData --- session in archive: " + id
                    + " with timeout: " + to);
        }
        count--;
    }
	// 恢复快照数据到DataTree
    dt.deserialize(ia, "tree");
}


public void deserialize(InputArchive ia, String tag) throws IOException {
    aclCache.deserialize(ia);
    nodes.clear();
    pTrie.clear();
    String path = ia.readString("path");

	// 从快照中恢复每一个datanode节点数据到DataTree
while (!"/".equals(path)) {

		// 每次循环创建一个节点对象
        DataNode node = new DataNode();
        ia.readRecord(node, "node");

		// 将DataNode恢复到DataTree
        nodes.put(path, node);
        synchronized (node) {
            aclCache.addUsage(node.acl);
        }
        int lastSlash = path.lastIndexOf('/');
        if (lastSlash == -1) {
            root = node;
        } else {
			// 处理父节点
            String parentPath = path.substring(0, lastSlash);
            DataNode parent = nodes.get(parentPath);
            if (parent == null) {
                throw new IOException("Invalid Datatree, unable to find " +
                        "parent " + parentPath + " of path " + path);
            }

			// 处理子节点
            parent.addChild(path.substring(lastSlash + 1));
			
			// 处理临时节点和永久节点
            long eowner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(eowner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (eowner != 0) {
                HashSet<String> list = ephemerals.get(eowner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(eowner, list);
                }
                list.add(path);
            }
        }
        path = ia.readString("path");
    }
    nodes.put("/", root);
    // we are done with deserializing the
    // the datatree
    // update the quotas - create path trie
    // and also update the stat nodes
    setupQuota();

    aclCache.purgeUnused();
}

2.3.2 冷启动数据恢复编辑日志

回到FileTxnSnapLog.java类中的restore方法

public long restore(DataTree dt, Map<Long, Integer> sessions,
                    PlayBackListener listener) throws IOException {
	// 恢复快照文件数据到DataTree
    long deserializeResult = snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);

    RestoreFinalizer finalizer = () -> {
		// 恢复编辑日志数据到DataTree
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        return highestZxid;
    };

    … …

    return finalizer.run();
}

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                 PlayBackListener listener) throws IOException {
	// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的zxid + 1位置开始恢复
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
	// 快照中最大的zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
		// 从lastProcessedZxid事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
        while (true) {
            // iterator points to
            // the first valid txn when initialized
			// 获取事务头信息(有zxid)
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(highestZxid) > {}(next log) for type {}",
                        highestZxid, hdr.getZxid(), hdr.getType());
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
				// 根据编辑日志恢复数据到DataTree,每执行一次,对应的事务id,highestZxid + 1
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next())
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}

public void processTransaction(TxnHeader hdr,DataTree dt,
        Map<Long, Integer> sessions, Record txn)
    throws KeeperException.NoNodeException {
    ProcessTxnResult rc;
    switch (hdr.getType()) {
    case OpCode.createSession:
        sessions.put(hdr.getClientId(),
                ((CreateSessionTxn) txn).getTimeOut());
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                    "playLog --- create session in log: 0x"
                            + Long.toHexString(hdr.getClientId())
                            + " with timeout: "
                            + ((CreateSessionTxn) txn).getTimeOut());
        }
        // give dataTree a chance to sync its lastProcessedZxid
        rc = dt.processTxn(hdr, txn);
        break;
    case OpCode.closeSession:
        sessions.remove(hdr.getClientId());
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                    "playLog --- close session in log: 0x"
                            + Long.toHexString(hdr.getClientId()));
        }
        rc = dt.processTxn(hdr, txn);
        break;
    default:
		// 创建节点、删除节点和其他的各种事务操作等
        rc = dt.processTxn(hdr, txn);
    }

    /
     * Snapshots are lazily created. So when a snapshot is in progress,
     * there is a chance for later transactions to make into the
     * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
     * errors could occur. It should be safe to ignore these.
     */
    if (rc.err != Code.OK.intValue()) {
        LOG.debug(
                "Ignoring processTxn failure hdr: {}, error: {}, path: {}",
                hdr.getType(), rc.err, rc.path);
    }
}

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn)
{
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
            case OpCode.create:
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
                createNode(
                        createTxn.getPath(),
                        createTxn.getData(),
                        createTxn.getAcl(),
                        createTxn.getEphemeral() ? header.getClientId() : 0,
                        createTxn.getParentCVersion(),
                        header.getZxid(), header.getTime(), null);
                break;
            case OpCode.create2:
                CreateTxn create2Txn = (CreateTxn) txn;
                rc.path = create2Txn.getPath();
                Stat stat = new Stat();
                createNode(
                        create2Txn.getPath(),
                        create2Txn.getData(),
                        create2Txn.getAcl(),
                        create2Txn.getEphemeral() ? header.getClientId() : 0,
                        create2Txn.getParentCVersion(),
                        header.getZxid(), header.getTime(), stat);
                rc.stat = stat;
                break;
            case OpCode.createTTL:
                CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
                rc.path = createTtlTxn.getPath();
                stat = new Stat();
                createNode(
                        createTtlTxn.getPath(),
                        createTtlTxn.getData(),
                        createTtlTxn.getAcl(),
                        EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                        createTtlTxn.getParentCVersion(),
                        header.getZxid(), header.getTime(), stat);
                rc.stat = stat;
                break;
            case OpCode.createContainer:
                CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
                rc.path = createContainerTxn.getPath();
                stat = new Stat();
                createNode(
                        createContainerTxn.getPath(),
                        createContainerTxn.getData(),
                        createContainerTxn.getAcl(),
                        EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                        createContainerTxn.getParentCVersion(),
                        header.getZxid(), header.getTime(), stat);
                rc.stat = stat;
                break;
            case OpCode.delete:
            case OpCode.deleteContainer:
                DeleteTxn deleteTxn = (DeleteTxn) txn;
                rc.path = deleteTxn.getPath();
                deleteNode(deleteTxn.getPath(), header.getZxid());
                break;
            case OpCode.reconfig:
            case OpCode.setData:
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                rc.path = setDataTxn.getPath();
                rc.stat = setData(setDataTxn.getPath(), setDataTxn
                        .getData(), setDataTxn.getVersion(), header
                        .getZxid(), header.getTime());
                break;
            case OpCode.setACL:
                SetACLTxn setACLTxn = (SetACLTxn) txn;
                rc.path = setACLTxn.getPath();
                rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
                        setACLTxn.getVersion());
                break;
            case OpCode.closeSession:
                killSession(header.getClientId(), header.getZxid());
                break;
            case OpCode.error:
                ErrorTxn errTxn = (ErrorTxn) txn;
                rc.err = errTxn.getErr();
                break;
            case OpCode.check:
                CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
                rc.path = checkTxn.getPath();
                break;
            case OpCode.multi:
                MultiTxn multiTxn = (MultiTxn) txn ;
                List<Txn> txns = multiTxn.getTxns();
                rc.multiResult = new ArrayList<ProcessTxnResult>();
                boolean failed = false;
                for (Txn subtxn : txns) {
                    if (subtxn.getType() == OpCode.error) {
                        failed = true;
                        break;
                    }
                }

                .. …
        }
    } catch (KeeperException e) {
        ... ...
    } catch (IOException e) {
        ... ...
    }

    ... ...
	
    return rc;
}

2.4 ZK选

标签:ZK,Zookeeper,header,源码,quorumPeer,rc,new,path,config
来源: https://www.cnblogs.com/niuniu2022/p/16343108.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有