ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Zookeepr + Curator

2022-05-07 16:01:54  阅读:145  来源: 互联网

标签:node log curator Curator org import Zookeepr public


------------恢复内容开始------------

一、 引入maven 依赖包, zookeeper 单独引入

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>    

二、创建 Curator客户端 Bean, 有两种方式, 一种  工厂方式, 一种 build 建造者方式

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CuratorCli {
    @Value("${zk.hostAddr}")
    private String zkHostAddr;

    @Value("${zk.session.timeout:5000}")
    private Integer sessionTimeOut;
    @Bean
    public CuratorFramework factoryCli() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkHostAddr, sessionTimeOut, sessionTimeOut, retryPolicy);
        client.start();
        return client;
    }


    @Bean
    public CuratorFramework fluentCli() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(zkHostAddr)
                .sessionTimeoutMs(sessionTimeOut)
                .connectionTimeoutMs(sessionTimeOut)
                .retryPolicy(retryPolicy)
//                .namespace("base") //包含隔离名称
                .build();
        client.start();
        return client;
    }
}

三、 进行创建、修改、删除等操作

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
@EnableAsync
public class CuratorDemo {

    @Autowired
    private CuratorFramework factoryCli;

    @Autowired
    private CuratorFramework fluentCli;

    public void create() throws Exception {
        String path = factoryCli.create().forPath("/curator-node", "hi girl".getBytes());
//        factoryCli.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node", "hello god".getBytes());
        log.info("curator create node:{} successfully", path);
    }

    public void createWithParent() throws Exception {
        String path = factoryCli.create().creatingParentsIfNeeded().forPath("/curator-parent/sub-node-1");
//        factoryCli.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node", "hello god".getBytes());
        log.info("curator create node:{} successfully", path);
    }

    public void getData() throws Exception {
        byte [] bytes = fluentCli.getData().forPath("/curator-node");
        log.info("get data from node : {} successfully", new String(bytes));

    }

    public void setData() throws Exception {
        fluentCli.setData().forPath("/curator-node", "hi mylove".getBytes());
        byte[] bytes = fluentCli.getData().forPath("/curator-node");
        log.info("get data from node /curator-node:{} successfully", new String(bytes));
    }

    public void deleteData() throws Exception {
        factoryCli.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator-parent");
    }

    /**
     * 异步处理
     * @throws Exception
     */
    public void asyncHandle() throws Exception {
        factoryCli.getData().inBackground((item1, item2) -> {
            log.info("background:{}", new String(item2.getData()));
        }).forPath("/curator-node");

        TimeUnit.SECONDS.sleep(10);
    }

    @Autowired
    private Executor asyncExecutor;    //配置链接池

    public void executor() throws Exception {
        factoryCli.getData().inBackground((item1, item2) -> {  //回调
            log.info("background: {}", new String(item2.getData()));
        },asyncExecutor).forPath("/curator-node");

        TimeUnit.SECONDS.sleep(10);
    }

}

四、 Curator 缓存&监听, 第一遍运行添加到缓存时即会触发监听事件。测试时主线程sleep住,然后可以在客户端操作 zookeeper, 即可观察到监听信息

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class WatchDemo {

    @Autowired
    CuratorFramework factoryCli;

    public void watchForeverByNodeCache() {
        // 创建缓存实例, 基于 /watcher-node 路径, 默认是 树结构都缓存, 可以加参数 CuratorCache.Options.SINGLE_NODE_CACHE 只为当前节点加缓存
        CuratorCache curatorCache = CuratorCache.build(factoryCli, "/watcher-node");

        curatorCache.listenable().addListener(new CuratorCacheListener() {
            @Override
            public void event(Type type, ChildData childData, ChildData childData1) {
                switch (type){
                    case NODE_CREATED:
                        log.info("The event {} is  NODE_CREATED");
                        break;
                    case NODE_CHANGED:
                        log.info("The event {} is  NODE_CHANGED");
                        break;
                    case NODE_DELETED:
                        log.info("The event {} is  NODE_DELETED");
                        break;
                    default:
                        break;
                }
            }
        });
        curatorCache.start();
    }

    public void watcher2() {
        CuratorCache curatorCache = CuratorCache.build(factoryCli, "/watcher-node");

        curatorCache.listenable().addListener(CuratorCacheListener.builder()
                .forInitialized( ()-> log.info("[forInitialized]: cache initialized"))  //初始化完成调用
                .forCreatesAndChanges(  //添加或修改缓存数据时调用
                        (oldNode, node) -> log.info("[forCreatesAndChanges]: NodeChanged: old:[{}]\n, new:[{}]", oldNode, node)
                )
                .forCreates(childData -> log.info("[forCreates] : Node created: {}", childData))   //添加缓存数据时调用
                .forChanges((oldNode, node) -> log.info("[forChanges] : Node changed: Old:[{}]\n, new:[{}]", oldNode, node))  //更改缓存数据时调用
                .forDeletes(childData -> log.info("[forDelete]: Node delted:data : []", childData)) //删除缓存数据时调用
                .forAll((type, oldData, data) -> log.info("[forAll]: type: [{}], [{}], [{}] \n", type, oldData, data)) //添加、更改、删除缓存数据时调用
                .build());

        curatorCache.start();
    }

}

 

标签:node,log,curator,Curator,org,import,Zookeepr,public
来源: https://www.cnblogs.com/cgsdg/p/16242848.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有