当前位置 博文首页 > 干货满满张哈希:请问你知道分布式系统设计模式的最低水位线思想

    干货满满张哈希:请问你知道分布式系统设计模式的最低水位线思想

    作者:干货满满张哈希 时间:2021-02-14 10:29

    最低水位线(Low-Water Mark)

    最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃。

    问题背景

    WAL(Write Ahead Log)预写日志维护了对于存储的每次更新,随着时间不断增长,这个日志文件会变得无限大。Segmented Log 分割日志这种设计模式可以让我们每次只处理一个更小的文件,但是日志如果不清理,会无休止增长以至于硬盘被占满。

    解决方案

    最低水位线这种设计模式会告诉系统哪一部分的日志可以被删除了,即在最低水位线之前的所有日志可以被清理掉。一般的方式是,程序内有一个线程运行一个定时任务,不断地检查哪一部分的日志可以被清理并且删除这些日志文件。

    this.logCleaner = newLogCleaner(config);
    this.logCleaner.startup();
    

    这里的 LogCleaner 可以用定时任务实现:

    public void startup() {
        scheduleLogCleaning();
    }
    
    private void scheduleLogCleaning() {
        singleThreadedExecutor.schedule(() -> {
            cleanLogs();
        }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);
    }
    

    基于快照的最低水位线实现以及示例

    大部分的分布式一致性系统(例如 Zookeeper(ZAB 简化 paxos协议),etcd(raft协议)),都实现了快照机制。在这种机制下,他们的存储引擎会定时的进行全量快照,并且记录下快照对应的日志位置,将这个位置作为最低水位线。

    //进行快照
    public SnapShot takeSnapshot() {
        //获取最近的日志id
        Long snapShotTakenAtLogIndex = wal.getLastLogEntryId();
        //利用这个日志 id 作为标识,生成快照
        return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);
    }
    

    当生成了快照并成功存储到了磁盘上,对应的最低水位线将用来清理老的日志:

    //根据位置获取这个位置之前的所有日志文件
    List<WALSegment> getSegmentsBefore(Long snapshotIndex) {
        List<WALSegment> markedForDeletion = new ArrayList<>();
        List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;
        for (WALSegment sortedSavedSegment : sortedSavedSegments) {
            //如果这个日志文件的最新log id 小于快照位置,证明可以被清理掉
            if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) {
                markedForDeletion.add(sortedSavedSegment);
            }
        }
        return markedForDeletion;
    }
    

    zookeeper 中的最低水位线实现

    定时任务位于DatadirCleanupManagerstart方法:

    public void start() {
        //只启动一次
        if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
            LOG.warn("Purge task is already running.");
            return;
        }
        //检查定时间隔有效性
        if (purgeInterval <= 0) {
            LOG.info("Purge task is not scheduled.");
            return;
        }
        //启动定时任务
        timer = new Timer("PurgeTask", true);
        TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount);
        timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
        purgeTaskStatus = PurgeTaskStatus.STARTED;
    }
    

    核心方法为PurgeTxnLogpurge方法:

    public static void purge(File dataDir, File snapDir, int num) throws IOException {
        //保留的snapshot数量不能超过3
        if (num < 3) {
            throw new IllegalArgumentException(COUNT_ERR_MSG);
        }
    
        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
        //统计文件数量
        List<File> snaps = txnLog.findNValidSnapshots(num);
        int numSnaps = snaps.size();
        if (numSnaps > 0) {
            //利用上一个文件的日志偏移,清理log文件和snapshot文件
            purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
        }
    }
    
    static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
        //名字包括开头的zxid,就是代表了日志位置
        final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT);
        final Set<File> retainedTxnLogs = new HashSet<File>();
        retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));
        class MyFileFilter implements FileFilter {
    
            private final String prefix;
            MyFileFilter(String prefix) {
                this.prefix = prefix;
            }
            public boolean accept(File f) {
                if (!f.getName().startsWith(prefix + ".")) {
                    return false;
                }
                if (retainedTxnLogs.contains(f)) {
                    return false;
                }
                long fZxid = Util.getZxidFromName(f.getName(), prefix);
                //根据文件名称代表的zxid,过滤出要删除的文件
                return fZxid < leastZxidToBeRetain;
            }
    
        }
        //筛选出符合条件的 log 文件和 snapshot 文件
        File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
        List<File> files = new ArrayList<>();
        if (logs != null) {
            files.addAll(Arrays.asList(logs));
        }
        File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));
        if (snapshots != null) {
            files.addAll(Arrays.asList(snapshots));
        }
        //进行删除
        for (File f : files) {
            final String msg = String.format(
                "Removing file: %s\t%s",
                DateFormat.getDateTimeInstance().format(f.lastModified()),
                f.getPath());
    
            LOG.info(msg);
            System.out.println(msg);
    
            if (!f.delete()) {
                System.err.println("Failed to remove " + f.getPath());
            }
        }
    
    }
    

    那么是什么时候 snapshot 呢?查看SyncRequestProcessorrun方法,这个方法时处理请求,处理请求的时候记录操作日志到 log 文件,同时在有需要进行 snapshot 的时候进行 snapshot:

    public void run() {
        try {
            //避免所有的server都同时进行snapshot
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                //获取请求代码省略
                // 请求操作纪录成功
                if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                    //是否需要snapshot
                    if (shouldSnapshot()) {
                        //重置是否需要snapshot判断相关的统计
                        resetSnapshotStats();
                        //另起新文件
                        zks.getZKDatabase().rollLog();
                        //进行snapshot,先获取锁,保证只有一个进行中的snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            //异步snapshot
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        //释放锁
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } 
                //省略其他
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
    }
    

    resetSnapshotStats()设置随机起始位,避免集群内所有实例同时进行 snapshot:

    private void resetSnapshotStats() {
        //生成随机roll,snapCount(默认100000)
        randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
        //生成随机size,snapSizeInBytes(默认4GB)
        randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
    }
    

    shouldSnapshot()根据启动时设置的随机起始位以及配置,判断是否需要 snapshot

    private boolean shouldSnapshot() {
        //获取日志计数
        int logCount = zks.getZKDatabase().getTxnCount();
        //获取大小
        long logSize = zks.getZKDatabase().getTxnSize();
        //当日志个数大于snapCount(默认100000)/2 + 随机roll,或者日志大小大于snapSizeInBytes(默认4GB)/2+随机size
        return (logCount > (snapCount / 2 + randRoll))
               || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
    }
    

    ``

    基于时间的最低水位线实现与示例

    在某些系统中,日志不是用来更新系统的状态,可以在一段时间之后删除,并且不用考虑任何子系统这个最低水位线之前的是否可以删除。例如,kafka 默认保留 7 天的 log,RocketMQ 默认保留 3 天的 commit log。

    RocketMQ中最低水位线实现

    DefaultMeesageStoreaddScheduleTask()方法中,定义了清理的定时任务:

    private void addScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
        //忽略其他定时任务
    }
    
    private void cleanFilesPeriodically() {
        //清理消息存储文件
        this.cleanCommitLogService.run();
        //清理消费队列文件
        this.cleanConsumeQueueService.run();
    }
    

    我们这里只关心清理消息存储文件,即DefaultMessageStoredeleteExpiredFiles方法:

    private void deleteExpiredFiles() {
        int deleteCount = 0;
        //文件保留时间,就是文件最后一次更新时间到现在的时间间隔,如果超过了这个时间间隔,就认为可以被清理掉了
        long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
        //删除文件的间隔,每次清理可能不止删除一个文件,这个配置指定两个文件删除之间的最小间隔
        int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
        //清理文件时,可能文件被其他线程占用,例如读取消息,这时不能轻易删除
        //在第一次触发时,记录一个当前时间戳,当与当前时间间隔超过这个配置之后,强制删除
        int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    
        //判断是否要删除的时间到了
        boolean timeup = this.isTimeToDelete();
        //判断磁盘空间是否还充足
        boolean spacefull = this.isSpaceToDelete();
        //是否是手工触发
        boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    
        //满足其一,就执行清理
        if (timeup || spacefull || manualDelete) {
    
            if (manualDelete)
                this.manualDeleteFileSeveralTimes--;
            boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
            fileReservedTime *= 60 * 60 * 1000;
            
            //清理文件
            deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                destroyMapedFileIntervalForcibly, cleanAtOnce);
            if (deleteCount > 0) {
            } else if (spacefull) {
                log.warn("disk space will be full soon, but delete file failed.");
            }
        }
    }
    

    清理文件的代码MappedFiledeleteExpiredFileByTime方法:

    public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
        Object[] mfs = this.copyMappedFiles(0);
    
        if (null == mfs)
            return 0;
    
        //刨除最新的那个文件
        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                //如果超过了过期时间,或者需要立即清理
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    //关闭,清理并删除文件
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++;
    
                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
                            break;
                        }
    
                        //如果配置了删除文件时间间隔,则需要等待
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
        }
    
        //从文件列表里面里将本次删除的文件剔除
        deleteExpiredFile(files);
    
        return deleteCount;
    }
    

    每日一刷,轻松提升技术,斩获各种offer:

    image

    bk
    下一篇:没有了