ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Zookeeper curator框架

2021-05-03 21:57:52  阅读:222  来源: 互联网

标签:info throws log 框架 void Zookeeper curator public 节点


1. 简介

  • curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。
  • 提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。

1.1 原生zookeeperAPI的不足

  • 连接对象异步创建,需要开发人员自行编码等待
  • 连接没有自动重连超时机制
  • watcher一次注册生效一次
  • 不支持递归创建树形节点

1.2 curator特点

  • 解决session会话超时重连
  • watcher反复注册
  • 简化开发api
  • 遵循Fluent风格的API
  • 提供了分布式锁服务、共享计数器、缓存机制等机制

1.3 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <type>jar</type>
    </dependency>
</dependencies>

2. 连接与关闭

  • 采用了工厂设计模式和建造者设计模式。通过输入一些连接信息,可以获取到一个连接Zookeeper服务器的客户端。
      public static void main(String[] args) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);  // 表示间隔1s,最多尝试重连3次
        CuratorFramework client = CuratorFrameworkFactory
                                              .builder()
                                              .connectString("192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181")
                                              .sessionTimeoutMs(5000)
                                              .retryPolicy(retryPolicy)
                                              .namespace("create")
                                              .build();
        client.start();  // 开启客户端
        log.info(client.isStarted());
        client.close();  // 关闭客户端
      }
    

    connectString:用于设置地址及端口号;
    sessionTimeoutMs:用于设置超时时间;
    retryPolicy:用于设置重连策略
    namespace:表示根节点路径,可以没有

2.1 测试模版

  • 因此,可以写一个测试模板,在开始之前打开客户端,在结束之后关闭客户端。
    public class CreateTest {
      private final static Logger log = Logger.getLogger(ConnectTest.class);
      private String connectString = "192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181";
      CuratorFramework client;
      Integer sessionTimeoutMs = 5000;
      Integer baseSleepTimeMs = 1000;
      Integer maxRetries = 3;
      String namespace = "create";
    
    
      @Before
      public void before() {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace(namespace)
                .build();
        client.start();
        log.info("客户端已开启");
      }
    	
      @After
      public void after() {
        client.close();
        log.info("客户端已关闭");
      }
    }
    

3. 新增节点

3.1 案例一:简单创建

  @Test
  public void testCreate() throws Exception {
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath("/node", "data".getBytes());
    log.info("create结束");
  }

3.2 案例二:自定义权限创建

  @Test
  public void testCreate2() throws Exception {
    Id ip = new Id("ip", "192.168.233.133");
    List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acl)
            .forPath("/node1", "data".getBytes());
    log.info("create结束");
  }

3.3 案例三:递归创建节点

  • .creatingParentsIfNeeded()实现,可以递归创建节点
      @Test
      public void testCreate3() throws Exception {
        //  递归创建节点
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/node2/node33", "data".getBytes());
        log.info("create结束");
      }
    

3.4 案例四:异步方法创建节点

  • 在此说明一下,方法接收到的第一个参数curatorFramework实际上就是客户端;curatorFramework保存了一些查询的结果。
      @Test
      public void testCreate4() throws Exception {
        //  异步方式创建节点
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .inBackground(new BackgroundCallback() {
                  @Override
                  public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    log.info(curatorFramework == client);  // true
                    log.info("getResultCode(): " + curatorEvent.getResultCode());  // 0表示创建成功
                    log.info("getType(): " + curatorEvent.getType().toString());  // 获取操作类型 CREATE
                    log.info("getPath(): " + curatorEvent.getPath());   // 获取节点路径
                  }
                })
                .forPath("/node2/node38", "data".getBytes());
        log.info("create结束");
      }
    

4. 更新节点

4.1 案例一:更新一个节点

  @Test
  public void testSet() throws Exception {
    client.setData()
            .forPath("/node", "set".getBytes());
    log.info("设置完成");
  }

4.2 案例二:带版本更新一个节点

  @Test
  public void testSet2() throws Exception {
    client.setData()
            .withVersion(1)  // 带有版本号
            .forPath("/node", "12".getBytes());
    log.info("设置完成");
  }

4.3 案例三:带回调方法更新一个节点

  @Test
  public void testSet3() throws Exception {
    client.setData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getResultCode());  // 0
                log.info(curatorEvent.getType());  // SET_DATA
                log.info(curatorEvent.getPath());  // /node
                log.info(curatorEvent.getStat().toString());  // 21474836489,21474836542,1620040487612,1620042328488,4,0,0,0,3,0,21474836489
              }
            })
            .forPath("/node", "432".getBytes());
    log.info("设置完成");
  }

5. 删除节点

5.1 案例一:删除一个节点

  @Test
  public void testDelete() throws Exception {
    client.delete()
            .forPath("/node");
    log.info("删除结束");
  }

5.2 案例二:递归删除节点

  @Test
  public void testDelete1() throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .forPath("/node2");
    log.info("删除结束");
  }

5.3 案例三:带回调方法删除一个节点

  @Test
  public void testDelete3() throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType());  // DELETE
                log.info(curatorEvent.getPath());  // /node1
              }
            })
            .forPath("/node1");
    log.info("删除结束");
  }

6. 查看节点

6.1 案例一:查看一个节点

  @Test
  public void testGet() throws Exception {
    byte[] data = client.getData()
            .forPath("/node2");
    log.info(new String(data));
  }

6.2 案例二:查看节点的值和状态

  @Test
  public void testGet2() throws Exception {
    Stat stat = new Stat();
    byte[] data = client.getData()
            .storingStatIn(stat)
            .forPath("/node2");
    log.info(new String(data));
    log.info(stat.getVersion());
  }

6.3 案例三:带回调方法查看一个节点

  @Test
  public void testGet3() throws Exception {
    client.getData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(new String(curatorEvent.getData()));  // 4134134
                log.info(curatorEvent.getStat().toString());  // 21474836566,21474836566,1620042863998,1620042863998,0,0,0,0,7,0,21474836566
                log.info(curatorEvent.getType().toString());  // GET_DATA
              }
            })
            .forPath("/node2");
  }

7. 查看子节点

7.1 案例一:查看一个节点的所有子节点

  @Test
  public void testChildren() throws Exception {
    List<String> children = client.getChildren()
            .forPath("/");
    log.info(children.toString());
  }

7.2 案例二:带回调方法查看一个节点的所有子节点

  @Test
  public void testChildren2() throws Exception {
    client.getChildren()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getPath()); // /
                log.info(curatorEvent.getType().toString());  // CHILDREN
                log.info(curatorEvent.getChildren().toString());  // [node, node2, node3]
              }
            })
            .forPath("/");
  }

8. 检查节点是否存在

8.1 案例一:检查一个节点是否存在

  @Test
  public void testExists() throws Exception {
    Stat stat = client.checkExists()
            .forPath("/node");
    if (stat != null)
      log.info(stat.toString());
    else
      log.info("节点不存在");
  }

8.2 案例二:带回调方法检查一个节点是否存在

  @Test
  public void testExists1() throws Exception {
    client.checkExists()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType().toString());  // EXISTS
                Stat stat = curatorEvent.getStat();
                if (stat != null)
                  log.info(stat.toString());  // 21474836548,21474836548,1620042534164,1620042534164,0,0,0,0,0,0,21474836548
                else
                  log.info("节点不存在");
              }
            })
            .forPath("/node");
  }

9. Watcher

  • curator提供了两种Watcher(Cache)来监听结点的变化
  • NodeCache : 只是监听某一个特定的节点,监听节点的新增、修改数据、删除。(子节点的新增、删除、修改均不会管)
  • PathChildrenCache : 监控一个ZNode的子节点. 当一个子节点增加、修改数据、删除时, PathCache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
  • 这个监视器可以多次使用

9.1 案例一:NodeCache

  @Test
  public void testWatch() throws Exception {
    //  观察节点的变化
    NodeCache nodeCache = new NodeCache(client, "/node22");
    nodeCache.start();
    nodeCache.getListenable()
            .addListener(new NodeCacheListener() {
              @Override
              public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null) {
                  log.info(currentData.getPath());
                  log.info(new String(currentData.getData()));
                } else {
                  log.info("删除了某个节点");
                }
              }
            });
    Thread.sleep(60000); //睡30s
    nodeCache.close();
  }

9.2 案例二:PathChildrenCache

  @Test
  public void testWatch2() throws Exception {
    //  观察节点的变化
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22", true);
    pathChildrenCache.start();
    pathChildrenCache.getListenable()
            .addListener(new PathChildrenCacheListener() {
              @Override
              public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                log.info(pathChildrenCacheEvent.getType());  // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
                log.info(pathChildrenCacheEvent.getData().toString());  // ChildData{path='/node22/child', stat=21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630, data=[50, 50]}
                log.info(new String(pathChildrenCacheEvent.getData().getData()));
                log.info(pathChildrenCacheEvent.getData().getPath());  // ChildData{path='/node22/child'
                log.info(pathChildrenCacheEvent.getData().getStat().toString());  // 21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630
              }
            });
    Thread.sleep(60000); //睡30s
    pathChildrenCache.close();
  }

10. 事务

10.1 案例一:使用事务创建两个节点

  @Test
  public void testTransaction() throws Exception {
    client.inTransaction()
            .create().forPath("/node100", "100".getBytes())
            .and()  // 桥
            .create().forPath("/node101", "101".getBytes())
            .and()  // 桥
            .commit();  // 提交
    log.info("提交成功");
  }

11. 分布式锁

11.1 使用分布式可重入排它锁

  • 排它锁,就是所有人都争抢同一个锁节点/lock,请求的时候,会在/lock内部添加一个顺序节点,当轮到自己的时候,就可以继续执行;否则就阻塞。释放锁的时候,会删除自己增加的顺序节点。(基本实现原理与分布式锁基本一致)
      @Test
      public void testMutex() throws Exception {
        //  排他锁
        InterProcessLock lock = new InterProcessMutex(client, "/lock");
        log.info("等待获取锁对象");
        lock.acquire();
        for (int i = 0; i < 3; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        lock.release();
        log.info("释放锁");
      }
    

11.2 使用读写锁

  • 读锁和写锁是两种类型的锁,但是如果两者争抢同一个锁节点的时候,也会发生一些有趣的事情。
  • 当读锁进入之后,其他的读锁也可以进入;但是写锁只能在外面等;
  • 当写锁进入之后,读写锁都不能进入。
      /**
       * 读锁在运行的时候,写锁不允许工作,在阻塞。
       * 读锁运行的时候,允许另一个读锁也进入读数据
       * 写锁运行时,其他读写锁都不能进入
       * @throws Exception
       */
      @Test
      public void testReadLock() throws Exception {
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
        InterProcessLock readLock = interProcessReadWriteLock.readLock();
        log.info("等待获取读锁对象");
        readLock.acquire();
        for (int i = 0; i < 10; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        readLock.release();
        log.info("释放锁");
      }
    
      @Test
      public void testWriteLock() throws Exception {
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
        InterProcessLock writeLock = interProcessReadWriteLock.writeLock();
        log.info("等待获取写锁对象");
        writeLock.acquire();
        for (int i = 0; i < 10; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        writeLock.release();
        log.info("释放锁");
      }
    

标签:info,throws,log,框架,void,Zookeeper,curator,public,节点
来源: https://blog.csdn.net/weixin_42524843/article/details/116380642

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有