ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ZooKeeper : Curator框架之分布式屏障DistributedBarrier

2022-01-17 20:34:36  阅读:159  来源: 互联网

标签:DistributedBarrier thread ZooKeeper Curator curator 屏障 等待 pool


DistributedBarrier

DistributedBarrier类的源码注释:

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.

分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。

类比单体应用的屏障CyclicBarrier

DistributedBarrier源码:

public class DistributedBarrier
{
    // CuratorFramework实例,用于与Zookeeper进行交互
    private final CuratorFramework client;
    // 分布式屏障的路径
    private final String barrierPath;
    // 监听器
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            client.postSafeNotify(DistributedBarrier.this);
        }
    };

    /**
     * 构造方法
     */
    public DistributedBarrier(CuratorFramework client, String barrierPath)
    {
        this.client = client;
        this.barrierPath = PathUtils.validatePath(barrierPath);
    }

    /**
     * 设置屏障(创建屏障节点)
     */
    public synchronized void         setBarrier() throws Exception
    {
        try
        {
            client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
        }
        catch ( KeeperException.NodeExistsException ignore )
        {
            // ignore
        }
    }

    /**
     * 移除屏障(删除屏障节点)
     */
    public synchronized void      removeBarrier() throws Exception
    {
        try
        {
            client.delete().forPath(barrierPath);
        }
        catch ( KeeperException.NoNodeException ignore )
        {
            // ignore
        }
    }

    /**
     * 阻塞直到屏障不再存在
     */
    public synchronized void      waitOnBarrier() throws Exception
    {
        waitOnBarrier(-1, null);
    }

    /**
     * 阻塞直到屏障不再存在或超时
     */
    public synchronized boolean      waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
    {
        long            startMs = System.currentTimeMillis();
        boolean         hasMaxWait = (unit != null);
        long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

        boolean         result;
        for(;;)
        {
            // 屏障节点是否不存在,true为不存在,false为存在
            result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
            // 屏障节点不存在,直接退出
            if ( result )
            {
                break;
            }
            // 屏障节点存在,进行等待
            if ( hasMaxWait )
            {
                long        elapsed = System.currentTimeMillis() - startMs;
                long        thisWaitMs = maxWaitMs - elapsed;
                // 等待超时,直接退出
                if ( thisWaitMs <= 0 )
                {
                    break;
                }
                // 继续等待
                wait(thisWaitMs);
            }
            else
            {
                wait();
            }
        }
        return result;
    }
}

DistributedBarrier的源码还是比较简单的,就是通过一个Zookeeper节点的创建与删除来实现分布式屏障。

测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

CuratorFrameworkProperties类(提供CuratorFramework需要的一些配置信息,以及创建CuratorFramework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}

DistributedBarrierRunnable类(实现了Runnable接口,模拟分布式节点等待分布式屏障):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.Random;

public class DistributedBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);
        
        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于提供分布式屏障功能
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
        long start = System.currentTimeMillis();
        // 等待屏障被移除
        barrier.waitOnBarrier();
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        System.out.println(Thread.currentThread().getName() + " 继续执行");
    }
}

启动类:

package com.kaven.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于设置和在适当时机删除屏障
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        // 设置屏障
        barrier.setBarrier();

        // 分布式节点处理业务
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new DistributedBarrierRunnable());
        }

        // 模拟移除屏障需要处理的业务
        Thread.sleep(20000);

        // 移除屏障
        barrier.removeBarrier();
    }
}

模拟5个分布式节点等待分布式屏障,输出如下所示:

pool-1-thread-5 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-1 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-2 等待了 19 s
pool-1-thread-2 继续执行
pool-1-thread-5 等待了 19 s
pool-1-thread-5 继续执行
pool-1-thread-3 等待了 19 s
pool-1-thread-3 继续执行
pool-1-thread-4 等待了 19 s
pool-1-thread-4 继续执行
pool-1-thread-1 等待了 19 s
pool-1-thread-1 继续执行

使用提供超时机制的等待屏障方法:

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DistributedBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于提供分布式屏障功能
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
        long start = System.currentTimeMillis();
        
        // 等待屏障被移除
        boolean result = barrier.waitOnBarrier(10, TimeUnit.SECONDS);
        
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        if(result) {
            System.out.println(Thread.currentThread().getName() + " 继续执行");
        }
        else {
            // 等待屏障超时
            System.out.println(Thread.currentThread().getName() + " 等待屏障超时");
        }
    }
}

输出如下所示:

pool-1-thread-1 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-5 等待屏障被移除
pool-1-thread-1 等待了 10 s
pool-1-thread-1 等待屏障超时
pool-1-thread-2 等待了 10 s
pool-1-thread-2 等待屏障超时
pool-1-thread-3 等待了 10 s
pool-1-thread-3 等待屏障超时
pool-1-thread-4 等待了 10 s
pool-1-thread-4 等待屏障超时
pool-1-thread-5 等待了 10 s
pool-1-thread-5 等待屏障超时

符合预期。

Curator框架的分布式屏障DistributedBarrier就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

标签:DistributedBarrier,thread,ZooKeeper,Curator,curator,屏障,等待,pool
来源: https://blog.csdn.net/qq_37960603/article/details/122547060

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有