ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Zookeeper实战(开发重点)

2022-01-15 09:34:18  阅读:163  来源: 互联网

标签:实战 zookeeper zk Zookeeper 2181 开发 CONNECTED 节点 localhost


1、分布式安装部署

1.集群规划

在hadoop101、hadoop102和hadoop103三个节点上部署Zookeeper。

2.配置服务器编号

(1)在/export/software/zookeeper/这个目录下创建zkData

[root@hadoop101 zookeeper]# mkdir -p zkData

(2)在/export/software/zookeeper/zkData目录下创建一个myid的文件

[root@hadoop101 zookeeper]# cd zkData

[root@hadoop101 zkData]# touch myid

添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码

(3)编辑myid文件

[root@hadoop101 zkData]# vi myid

在文件中添加与server对应的编号:

1

注意要和zoo.cfg中对应的编码配对起来。

(4)编辑日志存放的目录

[root@hadoop101 zookeeper]# cd bin

[root@hadoop101 zookeeper]# vim zkEnv.sh

注意里面的ZOO_LOG_DIR="."表示在哪里启动zookeeper,就把日志写在哪里,可以找个地方存放日志,修改为:

if [ "x${ZOO_LOG_DIR}" = "x" ]

then

    ZOO_LOG_DIR="/export/servers/zookeeper/logs"

fi

(5)拷贝配置好的zookeeper到其他机器上

[root@hadoop101 software]# scp -r zookeeper/ hadoop102:$PWD

[root@hadoop101 software]# scp -r zookeeper/ hadoop103:$PWD

并分别在hadoop102、hadoop103上修改myid文件中内容为2、3

[root@hadoop102 zkData]# echo 2 > myid

[root@hadoop103 zkData]# echo 3 > myid

3.集群操作

分别启动Zookeeper

[root@hadoop101 zookeeper]# bin/zkServer.sh start

注意:此时查看状态时有可能会出现Error

此时jps是可以看到QuorumPeerMain的。

注意前面提到zookeeper要超过一半以上节点存活时才能正常启动。只有一台不行,至少2台。因此启动第二台服务器。

[root@hadoop102 zookeeper]# bin/zkServer.sh start

此时,在第一台上看status,就正常了。

启动第三台服务器。

[root@hadoop103 zookeeper]# bin/zkServer.sh start

在三台机器上都查看状态

[root@hadoop102 zookeeper]# bin/zkServer.sh status

[root@hadoop103 zookeeper]# bin/zkServer.sh status

注意这里选举了第2台服务器作为leader。

​​​​​​​2、客户端命令行操作

命令基本语法

功能描述

help

显示所有操作命令

ls path [watch]

使用 ls 命令来查看当前znode中所包含的内容

ls2 path [watch]

查看当前节点数据并能看到更新次数等数据

create

普通创建

-s  含有序列

-e  临时(重启或者超时消失)

get path [watch]

获得节点的值

set

设置节点的具体值

stat

查看节点状态

delete

删除节点

rmr

递归删除节点

总结的就是4个思想:增删改查。

1.启动客户端

[root@hadoop101 zookeeper]# bin/zkCli.sh

如果出现如下信息:

......,closing socket connection and attempting recinnect

则要注意需要超过一半以上节点存活时才能正常启动。

2.显示所有操作命令

[zk: localhost:2181(CONNECTED) 1] help

3.查看当前znode中所包含的内容

[zk: localhost:2181(CONNECTED) 0] ls /

如果能看到这个效果,就表示正常了。

目前根目录只有一个子节点zookeeper。

也可以查看该子节点下面的内容。

另外后面可以加上watch。

        Zookeeper有两部分。一部分是文件系统,一部分是通知机制。通知的前提是先注册,一旦数据发生变化,只通知注册的人。

[zk: localhost:2181(CONNECTED) 2] ls / watch

首先展示根目录的子节点,然后看着根目录的子节点。与之前不一样的是,此时已经注册进根目录的子节点的观察者列表。

现在,假设根目录发生了变化。Zookeeper就会通知我。

启动另一个终端,也启动Client。

该终端在根目录创建子节点ttt。

[zk: localhost:2181(CONNECTED) 0] create /ttt 123

留意第一个终端出现了WatchedEvent,观察到了事件。

4.查看当前节点详细数据

[zk: localhost:2181(CONNECTED) 1] ls2 /

[zookeeper]

cZxid = 0x0

ctime = Thu Jan 01 08:00:00 CST 1970

mZxid = 0x0

mtime = Thu Jan 01 08:00:00 CST 1970

pZxid = 0x0

cversion = -1

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 0

numChildren = 1

5.分别创建2个普通节点

[zk: localhost:2181(CONNECTED) 1] create /tt1234 abcdef

        注意create后面需要跟两个参数。第一个是节点的名字,第二个是节点存储的内容。

另外可以跟-s和-e的参数。

-s表示带序号、序列的节点。

如果把上面那句话再写一次。

[zk: localhost:2181(CONNECTED) 2] create /tt1234 abcdef

会提示Node already exists。

此时,加上-s。

[zk: localhost:2181(CONNECTED) 2] create -s /tt1234 abcdef

        此时创建成功了,而且后面加上了一个序号。这就是带序号的节点。这个序号是全局递增的。比如多写几个create -s。

         注意如果改变了路径

[zk: localhost:2181(CONNECTED) 8] create -s /pppp abcdef

        说明这个序号是全局递增的。

        系统会把节点名字和内部序号做了拼接。

-e表示临时节点。

-e是临时的,不带-e则是永久的。

[zk: localhost:2181(CONNECTED) 10] create -e /xxx abcdef

        在第二个终端会话中创建了临时节点,在第一个终端会话中进行查询。

ls /

        目前是能看到该节点的。

        此时,在第二个终端中退出

[zk: localhost:2181(CONNECTED) 11] quit

        在第一个终端中再次查询。

ls /

可以看到,只有临时节点消失了。

所以,临时节点就是:创建临时节点的会话中断后,该节点就消失了。

[zk: localhost:2181(CONNECTED) 0] create -s -e /pppp asdf

这个就表示创建一个有序的临时节点。

6.获得节点的值

[zk: localhost:2181(CONNECTED) 5] get /ttt

其中第一行就是刚才设置的数据。

get watch可以看节点的变化。

[zk: localhost:2181(CONNECTED) 5] get /ttt watch

此时,在第二个终端中设置节点的数值

[zk: localhost:2181(CONNECTED) 0] set /ttt 2345

注意此时第一个终端中出现了WatchedEvent。

所以,ls watch和get watch这两个看到的事件类型是不一样的。

ls watch看到的类型是NodeChildrenChanged。

get watch看到的类型是NodeDataChanged。

另外,在第二个终端中改一下数值

[zk: localhost:2181(CONNECTED) 1] set /ttt 23456

        但留意第一个终端中是没有反应的。注意这个注册列表只会监控一次。

每个节点都会有一个监听者的列表。 通知一个,就划去一个,通知完了列表就空了。下次再想看,只能再注册。

7.修改节点数据值

[zk: localhost:2181(CONNECTED) 2] set /ttt 23456

如上。

8.查看节点状态

[zk: localhost:2181(CONNECTED) 3] stat /ttt

这些就是节点的状态。见后面关于stat结构体的介绍。

9.删除节点

[zk: localhost:2181(CONNECTED) 8] delete /ttt

注意delete删除的节点不能有子节点,如果有子节点,只能使用rmr。

10.递归删除节点

[zk: localhost:2181(CONNECTED) 9] rmr /ttt

11.节点的值变化监听

(1)在hadoop103主机上注册监听/sanguo节点数据变化

[zk: localhost:2181(CONNECTED) 26] [zk: localhost:2181(CONNECTED) 8] get /sanguo watch

(2)在hadoop102主机上修改/sanguo节点的数据

[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"

(3)观察hadoop103主机收到数据变化的监听

WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo

12.节点的子节点变化监听(路径变化)

(1)在hadoop103主机上注册监听/sanguo节点的子节点变化

[zk: localhost:2181(CONNECTED) 1] ls /sanguo watch

[aa0000000001, server101]

(2)在hadoop102主机/sanguo节点上创建子节点

[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"

Created /sanguo/jin

(3)观察hadoop103主机收到子节点变化的监听

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo

​​​​​​​3、API应用

①环境搭建

(1).创建一个Maven工程zookeeper1

(2).添加pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhang</groupId>

    <artifactId>zookeeper1</artifactId>

    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>RELEASE</version>

        </dependency>

        <dependency>

            <groupId>org.apache.logging.log4j</groupId>

            <artifactId>log4j-core</artifactId>

            <version>2.8.2</version>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>3.4.10</version>

        </dependency>

    </dependencies>

</project>

(3).拷贝log4j.properties文件到项目根目录

需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=target/spring.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

②创建ZooKeeper客户端

在main下创建包com.zhang.zookeeper,创建类ZkClient。

public class ZkClient {

}

创建zookeeper对象,并注册回调函数。

private static final String CONNECT_STRING =

 "hadoop101:2181,hadoop102:2181,hadoop103:2181";

private static final int SESSION_TIMEOUT = 2000;

private ZooKeeper zkClient = null;

@Before

public void init() throws Exception {

zkClient = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {

@Override

public void process(WatchedEvent event) {

// 收到事件通知后的回调函数(用户的业务逻辑)

System.out.println(event.getType() + "--" + event.getPath());

// 再次启动监听

try {

zkClient.getChildren("/", true);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

注意:

ZooKeeper构造方法的三个参数:

参数1:connectString。连接字符串。这里是hadoop101:2181,hadoop102:2181,hadoop103:2181。如果Windows中没有写hosts,则该字符串中需要改为IP地址。

参数2:sessionTimeout。SESSION_TIMEOUT表示主机客户端短了以后多久,主机认为客户端已经失联了。一旦失联了,名下所有临时节点都会被主机删除。

参数3:watcher。Zookeeper是两部分,文件系统加通知机制。通知机制生效的前提是需要在它的观察者名单上。如果这个客户端注册以后,默认的通知的回调函数就是这个watcher中的process()。

当后面的任何方法执行时,都会先执行@Before修饰的方法init()。创建ZooKeeper对象。然后执行zkClient相关方法时(比如zkClient.create()、zkClient.getChildren()等),都会触发该回调函数process()的调用。

③ 获取子节点并监听节点变化

// 获取子节点

@Test

public void ls() throws Exception {

List<String> children = zkClient.getChildren("/", true);

System.out.println("======================");

for (String child : children) {

System.out.println(child);

}

System.out.println("======================");

// 延时阻塞

Thread.sleep(Long.MAX_VALUE);

}

运行效果大致如下  

        注意客户端和服务端的通信属于异步通信,肯定涉及到多线程。单线程肯定不能实现异步通信。回调函数是运行在子线程的,一旦主线程结束了,子线程就死亡了(守护线程?)。

运行该程序,发现主线程阻塞了。此时在zookeeper中执行

[root@hadoop101 zookeeper]# bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 0] rmr /xxx  

可以留意回调函数被调用了。

④ 创建子节点

// 创建子节点

@Test

public void create() throws Exception {

// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型

String nodeCreated = zkClient.create("/zhang", "yiheng".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

System.out.println(nodeCreated);

}

参数3:定义哪些主机、哪些IP可以访问现在创建的这个节点。OPEN_ACL_UNSAFE特点是谁都可以访问,没有权限限制。

参数4:节点类型。实际上就是create命令中的-s和-e的参数。

  1. PERSISTENT。不带-s和-e参数。永久节点。
  2. EPHEMERAL_SEQUENTIAL。带-s和-e参数。带序号序列的临时节点。
  3. EPHEMERAL。带-e参数。临时节点。
  4. PERSISTENT_SEQUENTIAL。带-s参数。带序号序列的永久节点。

该函数返回创建的路径。

运行程序,结果如下:

现在在Linux中查看。

注意,如果是创建EPHEMERAL相关的节点,则需要加上延时阻塞的功能,不能在Linux中是看不到该节点的。

⑤判断Znode是否存在

// 判断znode是否存在

@Test

public void exist() throws Exception {

Stat stat = zkClient.exists("/eclipse", false);

System.out.println(stat == null ? "not exist" : "exist");

}

假如存在,则返回stat类型。

PS. 程序中需要导的包如下:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

注意:运行上面的程序,需要确保在虚拟机上开启zookeeper

[root@hadoop101 zookeeper]# bin/zkServer.sh start

[root@hadoop102 zookeeper]# bin/zkServer.sh start

[root@hadoop103 zookeeper]# bin/zkServer.sh start

可尝试自己实现其他方法:

get()

    @Test
    public void get() throws Exception {
        byte[] data = zkClient.getData("/tt1234", true, new Stat());
        String str = new String(data);
        System.out.println(str);
    }

注意:Stat是节点的状态信息。

set()

    @Test
    public void set() throws Exception {
        Stat stat = zkClient.setData("/tt1234", "defabc".getBytes(), 0);
        System.out.println(stat.getDataLength());
    }

运行结果如下:

注意version参数不能乱设,否则会出现BadVersion的错误。可以先通过

stat /tt1234

        命令查看版本号。假如要改的数据跟这个版本不一样,修改失败。这是因为这个程序是跟别的程序并发写的,为了产生误删的杯具,需要确定删的跟看到的是同一个东西。实际中一般可以考虑先获取stat对象,再getVersion()得到具体的版本,而不是写一个固定值。

delete()

    @Test
    public void delete() throws Exception {
        Stat stat = zkClient.exists("/tt1234", false);
        if (stat != null) {
            zkClient.delete("/tt1234", stat.getVersion());
        }
    }

以上zookeeper的API基本上就够了。

        观察一个节点,只能观察一次。以后有变化了,也看不到。现在希望循环注册节点,一旦有变化了就通知(而不是只通知一次)。

先注册一个节点

[zk: localhost:2181(CONNECTED) 0] create /a 123

编写代码

    public void register() throws Exception {
        byte[] data = zkClient.getData("/a", new Watcher(){

            public void process(WatchedEvent event) {
                try {
                    register();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, null);

        System.out.println(new String(data));
    }

    @Test
    public void testRegister() throws Exception {
        try {
            register();
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }  
    }

在通知的时候,再注册一遍。以此来实现循环调用。

运行程序,获取该节点123

现在,修改该节点的值

[zk: localhost:2181(CONNECTED) 1] set /a 1234

留意Linux终端运行命令的通知,Java终端也打印出更新的值。

可以自行运行下面的案例。

​​​​​​​4、箭头服务器节点动态上下线案例(扩展)

1.需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

2.需求分析,如图所示

3.具体实现

(1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers"

Created /servers

(2)服务器端向Zookeeper注册代码

package com.zhang.zkcase;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.ZooDefs.Ids;

public class DistributeServer {

private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";

private static int sessionTimeout = 2000;

private ZooKeeper zk = null;

private String parentNode = "/servers";

// 创建到zk的客户端连接

public void getConnect() throws IOException{

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

}

});

}

// 注册服务器

public void registServer(String hostname) throws Exception{

String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(hostname +" is online "+ create);

}

// 业务功能

public void business(String hostname) throws Exception{

System.out.println(hostname+" is working ...");

Thread.sleep(Long.MAX_VALUE);

}

public static void main(String[] args) throws Exception {

// 1获取zk连接

DistributeServer server = new DistributeServer();

server.getConnect();

// 2 利用zk连接注册服务器信息

server.registServer(args[0]);

// 3 启动业务功能

server.business(args[0]);

}

}

(3)客户端代码

package com.zhang.zkcase;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

public class DistributeClient {

private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";

private static int sessionTimeout = 2000;

private ZooKeeper zk = null;

private String parentNode = "/servers";

// 创建到zk的客户端连接

public void getConnect() throws IOException {

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

// 再次启动监听

try {

getServerList();

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

// 获取服务器列表信息

public void getServerList() throws Exception {

// 1获取服务器子节点信息,并且对父节点进行监听

List<String> children = zk.getChildren(parentNode, true);

        // 2存储服务器信息列表

ArrayList<String> servers = new ArrayList<String>();

        // 3遍历所有节点,获取节点中的主机名称信息

for (String child : children) {

byte[] data = zk.getData(parentNode + "/" + child, false, null);

servers.add(new String(data));

}

        // 4打印服务器列表信息

System.out.println(servers);

}

// 业务功能

public void business() throws Exception{

System.out.println("client is working ...");

Thread.sleep(Long.MAX_VALUE);

}

public static void main(String[] args) throws Exception {

// 1获取zk连接

DistributeClient client = new DistributeClient();

client.getConnect();

// 2获取servers的子节点信息,从中获取服务器信息列表

client.getServerList();

// 3业务进程启动

client.business();

}

}

标签:实战,zookeeper,zk,Zookeeper,2181,开发,CONNECTED,节点,localhost
来源: https://blog.csdn.net/qq_61645895/article/details/122478024

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有