ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

zk客户端的三种操作(akAPI,ZkClient,Curator)

2021-05-29 07:57:07  阅读:221  来源: 互联网

标签:String zk zookeeper Curator 节点 ZkClient new public watchedEvent


1. zk原生态API

新建Maven项目导入依赖

        <!-- zk 原生api 操作 -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>

zk客户端是异步的哦!需要引入CountDownLatch 来确保连接好了再做下面操作。zk 原生api是不支持迭代式的创建跟删除路径的。

public class CreateSessionDemo
{
	private final static String CONNECTSTRING = "localhost:2181";
	private static CountDownLatch countDownLatch = new CountDownLatch(1);
	// CountDownLatch 的引入是因为 zk 连接是异步的
	public static void main(String[] args) throws IOException, InterruptedException
	{
		ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher()
		{
			public void process(WatchedEvent watchedEvent)
			{
				//如果当前的连接状态是连接成功的,那么通过计数器去控制
				if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
				{
					countDownLatch.countDown();
					System.out.println(watchedEvent.getState());
				}
			}
		});
		countDownLatch.await();
		System.out.println(zooKeeper.getState());
	}
}

节点的增删改查 操作:


public class ApiOperatorDemo implements Watcher
{
	private final static String CONNECTSTRING = "localhost:2181";
	private static CountDownLatch countDownLatch = new CountDownLatch(1);
	private static ZooKeeper zookeeper;
	private static Stat stat = new Stat();

	public static void main(String[] args) throws Exception
	{
		zookeeper = new ZooKeeper(CONNECTSTRING, 5000, new ApiOperatorDemo());
		countDownLatch.await();
		// 确保连接ok

		//创建节点 设置为IDs权限为anyone 还是一个 永久节点
		String result = zookeeper.create("/node1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		zookeeper.getData("/node1", new ApiOperatorDemo(), stat);
		//增加一个
		System.out.println("创建成功:" + result);

		//修改数据
		zookeeper.getData("/node1", new ApiOperatorDemo(), stat);
		zookeeper.setData("/node1", "deer2".getBytes(), -1);
		Thread.sleep(2000);


		//删除节点
		zookeeper.getData("/node1", new ApiOperatorDemo(), stat);
		zookeeper.delete("/node1", -1);
		Thread.sleep(2000);

		// 创建节点和子节点
		String path = "/node11";
		zookeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		TimeUnit.SECONDS.sleep(1);

		Stat stat = zookeeper.exists(path + "/node1", true);
		if (stat == null)
		{
			//表示节点不存在
			zookeeper.create(path + "/node1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			TimeUnit.SECONDS.sleep(1);
		}
		//修改子路径
		zookeeper.setData(path + "/node1", "deer".getBytes(), -1);
		TimeUnit.SECONDS.sleep(1);


		//获取指定节点下的子节点
		List<String> childrens = zookeeper.getChildren("/node11", true);
		System.out.println(childrens);

	}

	public void process(WatchedEvent watchedEvent)
	{
		//如果当前的连接状态是连接成功的,那么通过计数器去控制
		if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
		{
			if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath())
			{
				countDownLatch.countDown();
				System.out.println(watchedEvent.getState() + "-->" + watchedEvent.getType());
			} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged)
			{
				try
				{
					System.out.println("数据变更触发路径:" + watchedEvent.getPath() + "->改变后的值:" +
							new String(zookeeper.getData(watchedEvent.getPath(), true, stat)));
				} catch (KeeperException e)
				{
					e.printStackTrace();
				} catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged)
			{//子节点的数据变化会触发
				try
				{
					System.out.println("子节点数据变更路径:" + watchedEvent.getPath() + "->节点的值:" +
							zookeeper.getData(watchedEvent.getPath(), true, stat));
				} catch (KeeperException e)
				{
					e.printStackTrace();
				} catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			} else if (watchedEvent.getType() == Event.EventType.NodeCreated)
			{//创建子节点的时候会触发
				try
				{
					System.out.println("节点创建路径:" + watchedEvent.getPath() + "->节点的值:" +
							zookeeper.getData(watchedEvent.getPath(), true, stat));
				} catch (KeeperException e)
				{
					e.printStackTrace();
				} catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			} else if (watchedEvent.getType() == Event.EventType.NodeDeleted)
			{//子节点删除会触发
				System.out.println("节点删除路径:" + watchedEvent.getPath());
			}
		}

	}
}

zk权限操作demo 

public class AuthControlDemo implements Watcher
{
	private final static String CONNECTSTRING = "localhost:2181";
	private static CountDownLatch countDownLatch = new CountDownLatch(1);
	private static CountDownLatch countDownLatch2 = new CountDownLatch(1);

	private static ZooKeeper zookeeper;
	private static Stat stat = new Stat();

	public static void main(String[] args) throws Exception
	{
		zookeeper = new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
		countDownLatch.await();

		ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("root:root")));
		ACL acl2 = new ACL(ZooDefs.Perms.CREATE, new Id("ip", "192.168.1.1"));

		List<ACL> acls = new ArrayList<>();
		acls.add(acl);
		acls.add(acl2);
		zookeeper.create("/auth1", "123".getBytes(), acls, CreateMode.PERSISTENT);
		zookeeper.addAuthInfo("digest", "root:root".getBytes());

		zookeeper.create("/auth1/auth1-1", "123".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);


		ZooKeeper zooKeeper1 = new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
		countDownLatch.await();
		zooKeeper1.addAuthInfo("digest", "root:root".getBytes());
		zooKeeper1.delete("/auth1/auth1-1", -1);


		// acl (create /delete /admin /read/write)
		//权限模式: ip/Digest(username:password)/world/super

	}

	public void process(WatchedEvent watchedEvent)
	{
		//如果当前的连接状态是连接成功的,那么通过计数器去控制
		if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
		{
			if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath())
			{
				countDownLatch.countDown();
				System.out.println(watchedEvent.getState() + "-->" + watchedEvent.getType());
			}
		}

	}
}

 

zk原生态APi缺点


1. 会话的连接是异步的;必须用到回调函数 
2. Watch需要重复注册: 看一次watch注册一次 
3. Session重连机制:有时session断开还需要重连接。
4. 开发复杂性较高:开发相对来说比较琐碎
 

2. zkClient


        <!-- zkclient 操作 -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
 创建会话(同步,重试)
public ZkClient(final String zkServers, final int sessionTimeout, 
                final int connectionTimeout, final ZkSerializer zkSerializer, 
                final long operationRetryTimeout)

 创建节点(同步,递归创建)
public String create(String path,Object data,final List<ACL> acl,CreateMode mode)
public void createPersistent(String path,boolean createParents,List<ACL> acl)
public void createPersistent(String path, Object data, List<ACL> acl)
public String createPersistentSequential(String path,Object data,List<ACL> acl)
public void createEphemeral(String path, Object data, List<ACL> acl)
public String createEphemeralSequential(String path,Object data,List<ACL> acl)

 删除节点(同步,递归删除)
public boolean delete(String path,int version)
public boolean deleteRecursive(String path)

获取节点(同步,避免不存在异常)
public List<String> getChildren(String path)
public <T> T readData(String path, boolean returnNullIfPathNotExists)
public <T> T readData(String path, Stat stat) 

 更新节点(同步,实现CAS,状态返回)
public void writeData(String path, Object datat, int expectedVersion)
public Stat writeDataReturnStat(String path,Object datat,int expectedVersion)

 检测节点存在(同步)
public boolean exists(String path) 

权限控制(同步)
public void addAuthInfo(String scheme, final byte[] auth);
public void setAcl(final String path, final List<ACL> acl);
 监听器

序号

监听器

注册API

1

IZkStateListener

unsubscribeStateChanges(IZkStateListener listener)

2

IZkDataListener

unsubscribeDataChangesIZkStateListener listener

3

IZkChildListener

unsubscribeChildChangesIZkStateListener listener

public class SessionDemo {

    private final static String CONNECTSTRING="localhost:2181";

    public static void main(String[] args) {
        ZkClient zkClient=new ZkClient(CONNECTSTRING,4000);

        System.out.println(zkClient+" - > success");
    }
}



public class ZkClientApiOperatorDemo
{

	private final static String CONNECTSTRING = "localhost:2181";

	private static ZkClient getInstance()
	{
		return new ZkClient(CONNECTSTRING, 10000);
	}

	public static void main(String[] args) throws InterruptedException
	{
		ZkClient zkClient = getInstance();
		//zkclient 提供递归创建父节点的功能
		zkClient.createPersistent("/zkclient/zkclient1/zkclient1-1/zkclient1-1-1", true);
		System.out.println("success");

		//删除节点
		zkClient.deleteRecursive("/zkclient");


		//获取子节点
		List<String> list = zkClient.getChildren("/node11");
		System.out.println(list);

		//watcher

		zkClient.subscribeDataChanges("/node11", new IZkDataListener()
		{
			@Override
			public void handleDataChange(String s, Object o) throws Exception
			{
				System.out.println("节点名称:" + s + "->节点修改后的值" + o);
			}

			@Override
			public void handleDataDeleted(String s) throws Exception
			{

			}
		});

		zkClient.writeData("/node11", "node");
		TimeUnit.SECONDS.sleep(2);

		zkClient.subscribeChildChanges("/node11", new IZkChildListener()
		{
			@Override
			public void handleChildChange(String s, List<String> list) throws Exception
			{
				System.out.println("节点名称:" + s + "->" + "当前的节点列表:" + list);
			}
		});

		zkClient.delete("/node11/node1");
		;
		TimeUnit.SECONDS.sleep(2);

	}
}

3. curator


        <!-- curator 操作 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.11.0</version>
        </dependency>

 

Java客户端连接集群

ZkClient client = new ZkClient("host1,host2,host3,host4,host5"); 

Zk客户端处理过程:解析→打散→形成环形地址列表队列

参考

原生API 

zkClient

Curator

标签:String,zk,zookeeper,Curator,节点,ZkClient,new,public,watchedEvent
来源: https://blog.51cto.com/u_14582976/2829346

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有