ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RocketMQ存储之MappedFileQueue

2021-06-07 21:33:23  阅读:197  来源: 互联网

标签:存储 null return mappedFile req offset MappedFileQueue RocketMQ MappedFile


文章目录

一、概述

上一篇我们分析了MappedFile的实现细节,MappedFile实现了文件内存映射的功能。本篇我们分析MappedFileQueue的实现。

MappedFileQueue的作用是:将多个MappedFile按顺序组织起来,并且提供MappedFile的“增删查”操作等作用。由于MappedFileQueue的实现逻辑并不复杂,本篇只分析一部分源码实现。

二、实现细节

首先来看一下MappedFileQueue的创建和加载:

    public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }

    public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            // 按顺序加载到队列中
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

实现过程很简单:将指定文件夹下的所有文件都加载映射为MappedFile,然后添加到队列中。

根据时间获取MappedFile:

    /**
     * 获取第一个更新时间大于timestamp的MappedFile,没有的话就返回最新的MappedFile
     * @param timestamp
     * @return
     */
    public MappedFile getMappedFileByTime(final long timestamp) {
    	// mappedFiles队列的副本
        Object[] mfs = this.copyMappedFiles(0);

        if (null == mfs)
            return null;

        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }

        return (MappedFile) mfs[mfs.length - 1];
    }

根据更新时间获取MappedFile的实现很简单,通过遍历mappedFiles队列查找第一个修改时间大于timestamp的MappedFile对象并返回,如果没有,就返回队列中的最后一个对象。

删除大于offset的数据:

    /**
     *
     * 删除大于offset的记录
     * @param offset
     */
    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
            // 当前file的最大offset
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            if (fileTailOffset > offset) {
                // 当前文件最大的offset大于offset,说明有需要删除的记录
                if (offset >= file.getFileFromOffset()) {
                    // 当前文件的最小offset小于等于offset,说明当前文件有部分记录需要删除

                    // 将MappedFile的三个指针都设置到offset对应的位置,使大于offset的数据可以被覆盖
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    // 当前文件的最小offset大于offset,说明当前文件所有记录都需要删除
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

删除大于offset的记录的处理过程也很简单:遍历所有的mappedFiles,检查每一个mappedFile的最大指针是否大于offset,如果大于说明该文件中包含需要删除的数据。然后判断mappedFile的最小指针是否大于offset,如果是,则说明该文件中的所有数据都是待删除数据。则整个文件执行销毁逻辑。如果不是,则说明部分数据需要删除,则把当前mappedFile的三个指针(上篇MappedFile分析过)设置为offset,使大于offset的空间可以被覆盖。

获取队列中最新的MappedFile:

    /**
     * 获取最后一个MappedFile,如果needCreate是true,则保证最后一个MappedFile不是空,且没有被写满
     * @param startOffset 整个MappedFileQueue是空时,如果需要创建第一个MappedFile,这个MappedFile的起始offset
     * @param needCreate 是否需要创建新的MappedFile
     * @return
     */
     public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {
            // 如果当前MappedFileQueue是空的,则要创建的文件的起始offset为不大于startOffset的最大能被mappedFileSize整除的数
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) {

            // 如果当前MappedFileQueue不是空的,且最新的MappedFile被写满了
            // 则下一个要创建的文件的起始offset为当前最后一个MappedFile的起始offset+mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        if (createOffset != -1 && needCreate) {
            // 待创建的文件路径
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            // 下一个待创建的文件路径(有可能会提前预创建一个文件)
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            // 创建文件
            if (this.allocateMappedFileService != null) {
               // 通过allocateMappedFileService创建
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    // 直接new创建
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

这个方法的主要作用就是获取队列中最新的MappedFile,并且参数列表中提供了一个needCreate的选项,控制当队列中没有MappedFile或者最新的MappedFile已经被写满时,是否需要创建新的MappedFile
这个过程中有一个有意思的逻辑,就是预创建MappedFile。可以看到,当allocateMappedFileService不是空的时候,就会使用allocateMappedFileService来提交创建MappedFile的任务,具体过程如下:

    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        int canSubmitRequests = 2;
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                // 内存暂存池中可用的buffer个数
                canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
            }
        }

        // 将nextFilePath需要创建的文件封装成AllocateRequest创建请求,添加到请求列表
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

        if (nextPutOK) {
            if (canSubmitRequests <= 0) {
                // 内存暂存池中没有可用buffer对象,分配失败
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            // 内存暂存池中没有可用buffer对象,将需要创建的文件请求,添加到请求队列
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }

            // 内存暂存池中可用buffer对象减去1
            canSubmitRequests--;
        }

        //预分配nextNextFilePath
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        // 将创建预分配的任务并添加到任务队列
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }

        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                // 等待文件创建线程执行完nextFilePath文件创建
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }

        return null;
    }

我们可以看到,MappedFile的暂存缓冲来自于messageStore提供的transientStorePool。当transientStorePool中没有可用的缓冲buffer,则无法提交创建MappedFile的任务。
在提交创建请求前,需要先校验是否已经提交了该路径的MappedFile创建任务,如果已经有,则不重复添加。随后还会添加一个预分配的任务,将一下一个要创建的MappedFile也作为任务提交。然后等待nextFilePath的MappedFile创建完成。

MappedFile创建任务的执行也很简单:

    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            req = this.requestQueue.take();
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }

            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();

                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

                long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
                if (elapsedTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }

                // pre write mappedFile
                // 文件预热
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMappedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }

                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
                req.getCountDownLatch().countDown();
        }
        return true;
    }

从任务线程首先任务队列中取出任务,然后执行MappedFile的创建过程:如果开启了暂存缓冲池的功能,就通过:ServiceLoader.load(MappedFile.class)方式创建MappedFile对象,否则就使用构造方法创建MappedFile,不管使用哪种方法创建,都会调用init方法来初始化MappedFile。如果开启了文件预热功能的话,还会调用MappedFile的warmMappedFile方法进行文件预热(文件预热的功能在上篇已经分析)。

根据数据位置重置数据:

    public boolean resetOffset(long offset) {
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast != null) {
            long lastOffset = mappedFileLast.getFileFromOffset() +
                mappedFileLast.getWrotePosition();
            long diff = lastOffset - offset;
            // 可以重置的前提是,当前写入指针提前与想要重置到的指针offset的偏移量不超过两个文件的额大小
            final int maxDiff = this.mappedFileSize * 2;
            if (diff > maxDiff)
                return false;
        }

        ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();

        while (iterator.hasPrevious()) {
            mappedFileLast = iterator.previous();

            // 从队列后往前遍历,如果当前mappedFile的起始指针大于offset,则需要丢弃该文件。
            // 否则第一次遇到mappedFile的起始指针不大于offset,就将当前文件的三个指针设置为offset,然后跳出循环
            // 即丢弃offset以后的所有数据
            if (offset >= mappedFileLast.getFileFromOffset()) {
                int where = (int) (offset % mappedFileLast.getFileSize());
                mappedFileLast.setFlushedPosition(where);
                mappedFileLast.setWrotePosition(where);
                mappedFileLast.setCommittedPosition(where);
                break;
            } else {
                iterator.remove();
            }
        }
        return true;
    }

重置数据的过程也很简单,但是重置数据有一个限制,是只能重置当前写指针之前2*mappedFileSize以内的数据。

数据从暂存缓冲中提交到文件(commit):

    /**
     * 提交数据
     * @param commitLeastPages 最少提交的页数
     * @return 是否数据全部提交完成
     */
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            // offset是在mappedFile里的位置
            int offset = mappedFile.commit(commitLeastPages);
            // where是在mappedFileQueue中的绝对位置
            long where = mappedFile.getFileFromOffset() + offset;
            // 本次提交之后的位置等于提交前的位置,说明全部数据都提交了,没有待提交的数据
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }

数据提交的逻辑也很简单:根据已提交位置找到第一个还未提交的MappedFile,然后提交该MappedFile,最后更新MappedFileQueue的committedWhere指针。
数据的flush和commit逻辑类似,这里不再分析。

总结

经过分析可知,MappedFileQueue的作用是:将一个文件目录下的MappedFile组织成一个根据文件名排序的文件队列。并且可以控制队列中的文件的创建、文件内容的提交、刷盘、文件的过期剔除。

标签:存储,null,return,mappedFile,req,offset,MappedFileQueue,RocketMQ,MappedFile
来源: https://blog.csdn.net/feijianke666/article/details/117233329

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有