深圳幻海软件技术有限公司 欢迎您!

Zookeeper系列—Zookeeper应用及常用命令

2023-02-28

第1章常用命令zk的应用主要是针对三类:java原生zk客户端的API操作(不用去学这部分内容,会增加太多的学习成本,了解一下就好了)。zkClient的使用,它是对Zookeeper原生API的封装。ApacheCurator,也是对ZookeeperAPI的封装(本文讲的应用针对这部分内容)。在

第1章 常用命令

zk的应用主要是针对三类:

  • java原生zk客户端的API操作(不用去学这部分内容,会增加太多的学习成本,了解一下就好了)。
  • zkClient的使用,它是对Zookeeper原生API的封装。
  • Apache Curator,也是对Zookeeper API 的封装(本文讲的应用针对这部分内容)。

在学Java API之前,我们先来了解一下zookeeper的常用命令。

连接zookeeper server。

[root@jt2 bin]# sh zkCli.sh -server 127.0.0.1:2181
  • 1.

获取帮助help。

连接远程节点。

connect 192.168.8.75:2181
  • 1.

关闭连接。

close
  • 1.

显示集群。

[zk: localhost:2181(CONNECTED) 0] config
server.0=jt2:2888:3888:participant
server.1=jt3:2888:3888:participant
server.2=jt4:2888:3888:participant
version=0
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

创建一个znode。

命令语法:create [-s] [-e] [-c] [-t ttl] path [data] [acl]
 
-s:创建的是带序列号的节点,序列号用0填充节点路径。
-e:创建的是临时节点。
-c:创建的是容器节点
path:znode的路径,ZooKeeper中没有相对路径,所有路径都必须以’/'开头。
data:znode携带的数据。
acl:这个节点的ACL。
#创建一个永久节点
[zk: localhost:2181(CONNECTED) 2] create /zkBase
Created /zkBase
#创建一个临时节点
[zk: localhost:2181(CONNECTED) 3] create -e /ephemeral_node
Created /ephemeral_node
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

删除znode节点。

#删除节点前要求节点目录为空,不存在子节点
[zk: localhost:2181(CONNECTED) 34] delete /config
Node not empty: /config
[zk: localhost:2181(CONNECTED) 35] delete /config/topics/test
[zk: localhost:2181(CONNECTED) 27] delete /ephemeral_node
#如果要删除整个节点及子节点可以使用deleteall
[zk: 192.168.0.143:2181(CONNECTED) 36] deleteall /config
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

显示一个节点的状态。

[zk: localhost:2181(CONNECTED) 11] stat /test
cZxid = 0x180000000e
ctime = Thu Jul 28 03:25:08 CST 2022
mZxid = 0x180000000e
mtime = Thu Jul 28 03:25:08 CST 2022
pZxid = 0x180000000e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

查看路径子节点。

命令语法:ls [-s] [-w] [-R] path。

  • -s 同时显示stat信息。
  • -w 只显示子节点信息,默认选项。
  • -R 递归显示。

获取指定路径下的数据。

[zk: localhost:2181(CONNECTED) 16] get /zookeeper/config
server.0=jt2:2888:3888:participant
server.1=jt3:2888:3888:participant
server.2=jt4:2888:3888:participant
version=0
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

设置或者更新路径数据。

[zk: localhost:2181(CONNECTED) 19] set /test/hehe "haha"
[zk: localhost:2181(CONNECTED) 20] get /test/hehe
haha
  • 1.
  • 2.
  • 3.

设置ACL。

ACL权限

ACL 简写

允许的操作

CREATE

c

创建子节点

READ

r

获取节点的数据和它的子节点

WRITE

w

设置节点的数据

DELETE

d

删除子节点 (仅下一级节点)

ADMIN

a

设置 ACL 权限

ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:

方案

描述

world

只有一个用户:anyone,代表所有人(默认)

ip

使用IP地址认证

auth

使用已添加认证的用户认证

digest

使用“用户名:密码”方式认证

[zk: localhost:2181(CONNECTED) 21] getAcl /test
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 22] create /mynode1 hello
Created /mynode1
[zk: localhost:2181(CONNECTED) 23] addauth digest admin:admin
[zk: localhost:2181(CONNECTED) 24] setAcl /mynode1 auth:admin:cdrwa
[zk: localhost:2181(CONNECTED) 25] getAcl /mynode1
'digest,'admin:x1nq8J5GOJVPY6zgzhtTtA9izLc=
: cdrwa
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

同步数据集群间数据。

[zk: localhost:2181(CONNECTED) 26] sync /
Sync is OK
  • 1.
  • 2.

查看命令执行历史。

[zk: localhost:2181(CONNECTED) 27] history
17 - help
18 - getAllChildrenNumber /zookeeper
19 - set /test/hehe "haha"
20 - get /test/hehe
21 - getAcl /test
22 - create /mynode1 hello
23 - addauth digest admin:admin
24 - setAcl /mynode1 auth:admin:cdrwa
25 - getAcl /mynode1
26 - sync /
27 - history
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

退出客户端。

[zk: localhost:2181(CONNECTED) 28] quit
WATCHER:: 
WatchedEvent state:Closed type:None path:null
2022-07-28 03:33:49,307 [myid:] - INFO  [main:ZooKeeper@1422] - Session: 0xcebb0001 closed
2022-07-28 03:33:49,308 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0xcebb0001
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

第2章 Java API使用

zookeeper客户端和服务器会话的建立是一个异步的过程,也就是说在程序中,程序方法在处理完客户端初始化后立即返回(即程序继续往下执行代码,这样,在大多数情况下并没有真正的构建好一个可用会话,在会话的生命周期处于“CONNECTED”时才算真正的建立完毕,所以需要使用到多线程中的一个工具类CountDownLatch)。

1、创建会话

(一共有4个构造方法,根据参数不同)。

Zookeeper(String connectString,int sessionTimeout,Watcher watcher)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd,boolean canBeReadOnly)
  • 1.
  • 2.
  • 3.
  • 4.

参数说明:

  • connectString :host:port指定的服务器列表,多个host:port之间用英文逗号分隔。还可以可选择地指定一个基路径,如果指定了一个基路径,则所有后续操作基于这个及路径进行。
  • sessionTimeOut:会话超时时间。以毫秒为单位。客户端和服务器端之间的连接通过心跳包进行维系,如果心跳包超过这个指定时间则认为会话超时失效。
  • watcher:指定默认观察者。如果为null表示不需要观察者。
  • canBeReadOnly :是否支持只读服务。只当一个服务器失去过半连接后不能再进行写入操作时,是否继续支持读取操作。
  • sessionId、SessionPassword:会话编号 会话密码(通过两个确定唯一一台客户端),用来实现会话恢复(重复回话)。

注意,整个创建会话的过程是异步的,构造方法会在初始化连接后即返回,并不代表真正建立好了一个会话,此时会话处于"CONNECTING"状态。当会话真正创建起来后,服务器会发送事件通知给客户端,只有客户端获取到这个通知后,会话才真正建立。

代码演示:

public class ZkConnect implements Watcher {
    private static final Logger log = LoggerFactory.getLogger(ZkConnect.class);
    //public static final String zkServerPath = "192.168.8.74:2181,192.168.8.75:2181,192.168.8.76:2181";
    public static final String zkServerPath = "127.0.0.1:2181";
    public static final Integer timeout = 5000;
    public static CountDownLatch countDownLatch = new CountDownLatch(1); 
    /**
     * 客户端与zkServer连接是一个异步的过程,当连接成功后,客户端会收到一个watch通知
     * 参数:
     *     connectString: 连接服务器的ip字符串
     *     sessionTimeout: 超时时间,心跳收不到了,就超时
     *     watcher: 通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,就设置为null
     *     canBeReadOnly: 可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写;此时数据被读取到的可能
     *     是旧数据,此处建议设置为false
     *     sessionId: 会话id
     *     sessionPasswd: 会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
     */
    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkConnect());
        log.warn("客户端开始连接zookeeper服务器。。。连接状态: {}", zk.getState());
        countDownLatch.await(); // 如果不停顿一段时间, 会收不到watch通知
        log.warn("连接状态: {}", zk.getState());
    } 
    @Override
    public void process(WatchedEvent event) {
        log.warn("接收到watch通知: {}", event);
        countDownLatch.countDown();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
public class ZkReconnect implements Watcher {
    private static final Logger log = LogManager.getLogger(ZkReconnect.class);
    public static final String zkServerPath = "127.0.0.1:2181";
    public static final Integer timeout = 5000;
    public static CountDownLatch countDownLatch1 = new CountDownLatch(1);
    public static CountDownLatch countDownLatch2 = new CountDownLatch(2);
    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkReconnect());
        long sessionId = zk.getSessionId();
        byte[] sessionPasswd = zk.getSessionPasswd();
        log.warn("客户端开始连接zookeeper服务器。。。连接状态: {}", zk.getState());
        countDownLatch1.await(); // 如果不停顿一段时间, 会收不到watch通知
        log.warn("连接状态: {}", zk.getState());
        Thread.sleep(1000);
        log.warn("开始会话重连...");
        ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZkReconnect(), sessionId, sessionPasswd);
        log.warn("重新连接, 状态: {}", zk.getState());
        countDownLatch2.await();
        log.warn("重新连接, 状态: {}", zk.getState());
    }
    @Override
    public void process(WatchedEvent event) {
        log.warn("接收到watch通知: {}", event);
        countDownLatch1.countDown();
        countDownLatch2.countDown();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.

2、创建节点

提供了两套创建节点的方法,同步和异步创建节点方式。

String create(final String path,byte data[],List<ACL> acl,CreateMode createMode);//同步方式创建
void create(final String path,byte data[],List<ACL> acl,CreateMode createMode,StringCallback cb,Object ctx);//异步方式创建
  • 1.
  • 2.

同步方式:

path:节点路径(名称):/nodeName。不允许递归创建节点,在父节点不存在的情况下,不允许创建子节点。

data[]:节点内容:要求类型是字节数组,也就是说不支持序列话方式,如果需要实现序列化,可使用java相关序列化框架,如Hessian,Kryo。

acl:节点权限:使用Ids.OPEN_ACL_UNSAFE开放权限即可。

createMode:节点类型:创建节点的类型,CreateMode.*,提供了如下所示的四种节点类型:

  • PERSISTENT(持久节点)。
  • PERSISTENT_SEQUENTIAL(持久顺序节点)。
  • EPHEMERAL(临时节点,本次会话有效)。
  • EPHEMERAL_SEQUENTIAL(临时顺序节点,本次会话有效)。

异步方式(在同步方法参数的基础上增加两个参数):

cb:回调方法:注册一个异步回调方法,要实现。
AsynCallBack.StringCallBack接口,重写processResult(int rc, String path, Object ctx, String name)方法,当节点创建完成后执行此方法。

  • rc:服务端响应码,0表示调用成功、-4表示端口连接、-110表示指定节点存在、-112表示会话已过期。
  • path:接口调用时传入的数据节点的路径参数。
  • ctx:调用接口传入的ctx值。
  • name:实际在服务端创建的节点的名称。

ctx:传递给回调方法的参数,一般为上下文(Context)信息。

代码演示

public class ZkNodeCreate implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZkNodeCreate.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000;
    public ZkNodeCreate() {}
    public ZkNodeCreate(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeCreate());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ZkNodeCreate zkNodeOperator = new ZkNodeCreate(zkServerPath);
        zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        new CountDownLatch(1).await();
    }
    /**
     * 同步或异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
     * 参数:
     *     path: 创建的路径
     *     data: 存储的数据
     *     acl: 控制权限策略. Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
     *                       Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
     *     createMode: 节点类型,是一个枚举
     *                  PERSISTENT  持久节点
     *                  PERSISTENT_SEQUENTIAL  持久顺序节点
     *                  EPHEMERAL  临时节点
     *                  EPHEMERAL_SEQUENTIAL  临时顺序节点
     *
     * @param path
     * @param data
     * @param acls
     */
    private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
        String result = "";
        try {
            // 同步创建
            //result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
            //log.warn("同步创建临时节点: {} 成功。。。", result);

            // 异步创建
            String ctx = "{'create':'success'}";
            zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx);
            Thread.sleep(5000);
            log.warn("异步创建临时节点: {} 成功。。。", result);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void process(WatchedEvent event) {
        log.warn("客户端连接接收到watch通知: {}", event);
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
    private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            log.warn("异步创建节点:{}, ctx: {}", path, (String)ctx);
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.

3、节点操作

修改节点数据。

public class ZkNodeUpdate implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZkNodeUpdate.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000;
 
    public ZkNodeUpdate() {}
    public ZkNodeUpdate(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeUpdate());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZkNodeUpdate zkNodeOperator = new ZkNodeUpdate(zkServerPath);
        // 创建节点
        zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        // 修改节点数据  第三个参数是版本号dataVersion,用于乐观锁控制
        Stat stat = zkNodeOperator.getZooKeeper().setData("/testnode", "修改后的数据".getBytes(), 0);
        //zk.setData(path, data, version,new UpdateCallBack(),ctx);//异步修改
        Thread.sleep(5000);
        log.warn("修改后, dataVersion版本: {}", stat.getVersion());
        new CountDownLatch(1).await();
    }
    private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
        String result = "";
        try {
            // 异步创建
            String ctx = "{'create':'success'}";
            zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx);
            Thread.sleep(5000);
            log.warn("异步创建临时节点: {} 成功。。。", result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void process(WatchedEvent event) {
        log.warn("客户端连接接收到watch通知: {}", event);
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
    private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            log.warn("异步创建节点:{}, ctx: {}", path, (String)ctx);
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.

同步或异步删除节点数据。

public static void main(String[] args) throws KeeperException, InterruptedException {
    ZkNodeDelete zkNodeOperator = new ZkNodeDelete(zkServerPath);
    // 创建节点
    zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    // 同步删除节点
    //zkNodeOperator.getZooKeeper().delete("/testnode", 1); // 第二个参数 dataVersion
    Thread.sleep(5000);
    // 异步删除节点
    String ctx = "{'delete':'success'}";
    zkNodeOperator.getZooKeeper().delete("/testnode", 0, new AsyncCallback.VoidCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx) {
            log.warn("异步删除节点:{}, ctx: {}", path, (String)ctx);
        }
    }, ctx);
    new CountDownLatch(1).await();
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

节点查询。

  • 获取节点数据。
public class ZKGetNodeData implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZKGetNodeData.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000;
    public ZKGetNodeData() {}
    public ZKGetNodeData(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static Stat stat = new Stat();
    public static void main(String[] args) throws Exception {
        ZKGetNodeData zkGetNodeData = new ZKGetNodeData(zkServerPath);
        zkGetNodeData.getZooKeeper().create("/testnode","testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Thread.sleep(5000);
        // 第一个参数: 节点path; 第二个参数: true注册一个监听事件; 第三个参数: 获取的结果会保存在stat
        byte[] result = zkGetNodeData.getZooKeeper().getData("/testnode", true, stat);
        log.warn("当前值: {}", new String(result));
        countDownLatch.await();
    }
    @Override
    public void process(WatchedEvent event) {
        try {
            if (event.getType() == Event.EventType.NodeDataChanged) {
                ZKGetNodeData zkGetNodeData = new ZKGetNodeData(zkServerPath);
                byte[] result = zkGetNodeData.getZooKeeper().getData("/testnode", false, stat);
                log.warn("监听到值已经更改, 更改后的值为: {}, 版本号: {}", new String(result), stat.getVersion());
                countDownLatch.countDown(); // 计数器减1
            } else if (event.getType() == Event.EventType.NodeCreated) {
 
            } else if (event.getType() == Event.EventType.NodeDeleted) {
 
            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
 
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 获取子节点列表。
public class ZKGetChildrenList implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZKGetChildrenList.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000;
    public ZKGetChildrenList() {}
    public ZKGetChildrenList(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    } 
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static Stat stat = new Stat();
    public static void main(String[] args) throws Exception {
        ZKGetChildrenList zkGetChildrenList = new ZKGetChildrenList(zkServerPath);
        zkGetChildrenList.getZooKeeper().create("/zookeeper/bbb","bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Thread.sleep(5000);
        // 同步调用: 参数1 节点路径, 参数2 true或false, 注册一个watch事件
        List<String> children = zkGetChildrenList.getZooKeeper().getChildren("/zookeeper", true);
        for (String child : children) {
            log.warn(child);
        }
        // 异步调用
//        String ctx = "{'callback':'ChildrenCallback'}";
//        zkGetChildrenList.getZooKeeper().getChildren("/testnode", true, new AsyncCallback.ChildrenCallback() {
//            @Override
//            public void processResult(int rc, String path, Object ctx, List<String> children) {
//                log.warn("callback, path: {}, children: {}", path, children.toString());
//            }
//        }, ctx);
        countDownLatch.await();
    }
    @Override
    public void process(WatchedEvent event) {
        try {
            if (event.getType() == Event.EventType.NodeDataChanged) {
            } else if (event.getType() == Event.EventType.NodeCreated) {
            } else if (event.getType() == Event.EventType.NodeDeleted) {
            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
                ZKGetChildrenList zkGetChildrenList = new ZKGetChildrenList(zkServerPath);
                List<String> children = zkGetChildrenList.getZooKeeper().getChildren("/zookeeper", false);
                log.warn("监听到子节点改变, 改变后子节点数组为:");
                for (String child : children) {
                    log.warn(child);
                }
                countDownLatch.countDown(); // 计数器减1
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.

判断节点是否存在。

public class ZKNodeExist implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZKNodeExist.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000;
    public ZKNodeExist() {}
    public ZKNodeExist(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZKNodeExist());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws Exception {
        ZKNodeExist zkNodeExist = new ZKNodeExist(zkServerPath);
        Stat stat = zkNodeExist.getZooKeeper().exists("/testnode", true);
        if (stat == null) {
            log.warn("节点/testnode不存在");
        } else {
            log.warn("节点/testnode存在. stat: {}", stat);
        } 
        countDownLatch.await();
    }
    @Override
    public void process(WatchedEvent event) {
        try {
            if (event.getType() == Event.EventType.NodeDataChanged) {
            } else if (event.getType() == Event.EventType.NodeCreated) {
            } else if (event.getType() == Event.EventType.NodeDeleted) {
            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
            }
            countDownLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    } 
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.

4、Watcher机制及ACL

zookeeper有watch事件,是一次性触发的。当watch监视的数据发生变化时,通知在创建zookeeper是设置了Watcher的客户端。Watcher类监视的事件类型和状态类型如下所示:

事件类型(znode节点相关):

  • EventType.NodeCreated:节点创建。
  • EventType.NodeDataChanged:节点数据变更。
  • EventType.NodeChildrenChanged:子节点变更。
  • EventType.NodeDeleted:节点删除。

状态类型(客户端实例相关):

  • KeeperState.Disconnected:未连接。
  • KeeperState.SyncConnected:已连接。
  • KeeperState.AuthFailed:认证失败。
  • KeeperState.Expired:会话失效。

Watcher的特性:一次性、客户端串行执行、轻量。

  • 一次性:对于ZK的Watcher,只需要记住一点:Zookeeper的watch事件是一次性触发的。当watch监视的数据发生变化时,通知设置了该watch的客户端,即watcher。由于zookeeper的监视都是一次性的,所以每次必须设置监控。
  • 客户端串行执行:客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时需要注意一点,千万不要因为一个Watcher的处理逻辑影响了这个客户端的Watcher回调。
  • 轻量:WatchedEvent是Zookeeper整个Wacher通知机制的最小通知单元,整个数据结构只包含三部分:通知状态、事件类型和节点路径。也就是说Watcher通知非常的简单,只会告诉客户端发生了事件而不会告知其具体内容,需要客户端自己去获取,比如NodeDataChanged事件,Zookeeper只会通知客户端指定节点的数据发生了变更,而不会直接提供具体的数据内容。

ACL(Access Control List),Zookeeper作为一个分布式协调框架,其内部存储的都是一些关乎分布式系统运行时状态的元数据,尤其是涉及到一些分布式锁、Master选举和协调等应用场景。我们需要有效的保障Zookeeper中的数据安全,Zookeeper提供了一套完善的ACL权限控制机制来保障数据的安全。

Zookeeper提供了三种模式,权限模式、授权对象、权限:

权限模式:Scheme,开发人员经常使用如下四种权限模式:

  • IP:ip模式通过ip地址粒度来进行权限控制,例如配置了:ip:192.168.1.107,即表示权限控制都是针对这个ip地址的,同时也支持按网段分配,比如:192.168.1.*。
  • Digest:digest是最常用的权限控制模式,也更符合对权限的认知。其类似于“username:password”形式的权限控制标识进行权限配置。Zookeeper会对形成的权限标识先后进行两次编码处理,分别是SHA-1加密算法和BASE64编码。
  • World:World是一种最开放的权限控制模式。这种模式可以看做为特殊的digest,它仅仅是一个标识而已。
  • Super:超级用户模式。在超级用户模式下可以对Zookeeper进行任意操作。

权限对象:指的是权限赋予给用户或者一个指定的实体,例如IP地址或机器等。在不同的模式下,授权对象是不同的。这种模式和授权对象一一对应。

权限:权限就是指那些通过权限检测后可以被允许执行的操作,在Zookeeper中,对数据的操作权限分为以下五大类:

CREATE、DELETE、READ、WRITE、ADMIN

自定义用户权限。

public class ZkNodeAcl implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZkNodeAcl.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000; 
    public ZkNodeAcl() {}
    public ZkNodeAcl(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeAcl());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, NoSuchAlgorithmException, KeeperException {
        ZkNodeAcl zkNodeOperator = new ZkNodeAcl(zkServerPath);
        ArrayList<ACL> acls = new ArrayList<ACL>();
        Id test1 = new Id("digest", DigestAuthenticationProvider.generateDigest("test1:123456"));
        Id test2 = new Id("digest", DigestAuthenticationProvider.generateDigest("test2:123456"));
        acls.add(new ACL(ZooDefs.Perms.ALL,test1));
        acls.add(new ACL(ZooDefs.Perms.READ,test2));
        acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE,test2));
        zkNodeOperator.createZKNode("/testacl", "heihei".getBytes(), acls);
        zkNodeOperator.getZooKeeper().addAuthInfo("digest", "test2:123456".getBytes());
        Thread.sleep(10000);
        Stat stat = new Stat();
        byte[] result = zkNodeOperator.getZooKeeper().getData("/testacl", false, stat);
        log.warn("当前值: {}, 版本: {}", new String(result), stat.getVersion());
        new CountDownLatch(1).await();
    }
    private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
        String result = "";
        try {
            String ctx = "{'create':'success'}";
            zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
            Thread.sleep(5000);
            log.warn("异步创建节点: {} 成功。。。", result);
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void process(WatchedEvent event) {
        log.warn("客户端连接接收到watch通知: {}", event);
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.

acl之ip权限。

public class ZkNodeAclIp implements Watcher {
    private ZooKeeper zooKeeper = null;
    private static final Logger log = LoggerFactory.getLogger(ZkNodeAclIp.class);
    private static final String zkServerPath = "127.0.0.1:2181";
    private static final Integer timeout = 5000;
    public ZkNodeAclIp() {}
    public ZkNodeAclIp(String connectString) {
        try {
            zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeAclIp());
        } catch (Exception e) {
            e.printStackTrace();
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    } 
    public static void main(String[] args) throws Exception {
        ZkNodeAclIp zkNodeAcl = new ZkNodeAclIp(zkServerPath); 
        // ip 方式的 acl
        ArrayList<ACL> aclsIP = new ArrayList<>();
        Id ipId1 = new Id("ip", "127.0.0.1");
        aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId1));
        // 创建节点
        zkNodeAcl.createZKNode("/testaclip", "testaclip".getBytes(), aclsIP);
        // 验证ip是否有权限
        Stat stat = new Stat();
        byte[] result = zkNodeAcl.getZooKeeper().getData("/testaclip", false, stat);
        log.warn("当前值: {}, 版本: {}", new String(result), stat.getVersion());
    }
    /**
     * 创建节点
     * @param path
     * @param data
     * @param acls
     */
    private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
        String result = "";
        try {
            // 同步创建
            result = zooKeeper.create(path, data, acls, CreateMode.PERSISTENT);
            log.warn("同步创建临时节点: {} 成功。。。", result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void process(WatchedEvent event) {
        log.warn("接收到watch通知: {}", event);
    }
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
 
    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.

第3章 Curator应用

Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。

引包:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.13</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
</dependencies>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.

1、基础API

public class CuratorBase { 
    private static final Logger log = LoggerFactory.getLogger(CuratorBase.class); 
    //zk服务地址
    static final String zk_path = "127.0.0.1:2181";
    //会话超时,默认60秒
    static final int session_timeout=60000;
    //连接超时时间
    static final int connect_timeout=15000;
    /**
     * 创建客户端
     * @return
     */
    private static CuratorFramework createClient(){
        //重连策略:1秒3次
        RetryPolicy retryPolicy = new RetryNTimes(1000,3);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectionTimeoutMs(connect_timeout)
                .sessionTimeoutMs(session_timeout)
                .connectString(zk_path)
                .retryPolicy(retryPolicy)
                .build();
        //开启链接
        zkClient.start();
        return zkClient;
    }
 
    public static void baseAPI() throws Exception {
        CuratorFramework zkCli = createClient();
        CuratorFrameworkState state = zkCli.getState();
        if(state.equals(CuratorFrameworkState.STARTED)){
            /**
             * 创建节点
             *
             * zk节点类型:
             * PERSISTENT : 持久化节点
             * PERSISTENT_SEQUENTIAL : 持久化有序节点
             * EPHEMERAL : 会话节点(伴随会话结束消失)
             * EPHEMERAL_SEQUENTIAL : 会话有序节点
             */
            String path = zkCli.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath("/curator/base/1", "curator测试".getBytes()); 
            log.warn("path : {}",path);
            /**
             * 获取节点数据
             */
            byte[] bytes = zkCli.getData().forPath(path);
            log.warn("节点数据 : {} ",new String(bytes));
 
            /**
             * 更新节点数据
             */
            zkCli.setData().forPath(path,"修改后的数据".getBytes());
            byte[] bytes1 = zkCli.getData().forPath(path);
            log.warn("更新节点数据 : {}",new String(bytes1));
            /**
             * 获取子节点
             */
            List<String> children_paths = zkCli.getChildren().forPath(path);
            children_paths.forEach(x->{
                log.warn(path+" 子节点:"+x);
            });
            /**
             * 检查节点状态
             */
            Stat stat = zkCli.checkExists().forPath(path);
            log.warn(path+" 节点状态:"+stat.toString());
            /**
             * 删除节点
             */
            zkCli.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ExecutorService executorService = Executors.newCachedThreadPool();
            String path2 = zkCli.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            log.warn("code:" + curatorEvent.getResultCode());
                            log.warn("type:" + curatorEvent.getType());
                            log.warn("线程为:" + Thread.currentThread().getName());
                            countDownLatch.countDown();
                        }
                    }, executorService)
                    .forPath("/curator/base/2","curator测试2".getBytes());
            countDownLatch.await();
            if(path2!=null){
                byte[] bytes2 = zkCli.getData().forPath(path2);
                log.warn("/curator/base/2  :  "+ new String(bytes));
            }
        }
    } 
    public static void main(String[] args) throws Exception {
        baseAPI();
    }
 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
public class BaseOperator {
    public static CuratorFramework getClient() {
        return CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒
                .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒
                .namespace("arch") //设置命名空间
                .build();
    }
    public static void create(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
        client.create().creatingParentsIfNeeded().forPath(path, payload);
    }
    public static void createEphemeral(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
    }
    public static String createEphemeralSequential(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
        return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
    }
 
    public static void setData(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
        client.setData().forPath(path, payload);
    }
    public static void delete(final CuratorFramework client, final String path) throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }
    public static void guaranteedDelete(final CuratorFramework client, final String path) throws Exception {
        client.delete().guaranteed().forPath(path);
    } 
    public static String getData(final CuratorFramework client, final String path) throws Exception {
        return new String(client.getData().forPath(path));
    }
    public static List<String> getChildren(final CuratorFramework client, final String path) throws Exception {
        return client.getChildren().forPath(path);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.

2、事件监听

zookeeper原生支持通过注册watcher来进行事件监听,但是其使用不是特别方便,需要开发人员自己反复注册watcher,比较繁琐。

Curator引入Cache来实现对zookeeper服务端事务的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程Zookeeper视图的对比过程。同时,Curator能够自动为开发人员处理反复注册监听,从而大大简化原生api开发的繁琐过程。

NodeCache:

public class ZkCuratorNodeCache {
    public static void main(String[] args) throws Exception {
        nodeCache();
    }
    public static void nodeCache() throws Exception {
        final String path = "/nodeCache";
        final CuratorFramework client = BaseOperator.getClient();
        client.start();
//        BaseOperator.delete(client, path);
        BaseOperator.create(client, path, "cache".getBytes());
        final NodeCache nodeCache = new NodeCache(client, path);
        nodeCache.start(true);
        nodeCache.getListenable()
                .addListener(() -> System.out.println("节点数据发生变化,新数据为:" + new String(nodeCache.getCurrentData().getData())));
        BaseOperator.setData(client, path, "cache1".getBytes());
        BaseOperator.setData(client, path, "cache2".getBytes()); 
        Thread.sleep(1000);
        client.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

NodeCache可以监听指定的节点,注册监听器后,节点的变化会通知相应的监听器。

Path Cache:

Path Cache 用来监听ZNode的子节点事件,包括added、updateed、removed,Path Cache会同步子节点的状态,产生的事件会传递给注册的PathChildrenCacheListener。

public class ZkCuratorPathCache {
    public static void main(String[] args) throws Exception {
        pathChildrenCache();
    }
    public static void pathChildrenCache() throws Exception {
        final String path = "/pathChildrenCache";
        final CuratorFramework client = BaseOperator.getClient();
        client.start(); 
        final PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener((client1, event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED:" + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED:" + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED:" + event.getData().getPath());
                    break;
                case CONNECTION_LOST:
                    System.out.println("CONNECTION_LOST:" + event.getData().getPath());
                    break;
                case CONNECTION_RECONNECTED:
                    System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath());
                    break;
                case CONNECTION_SUSPENDED:
                    System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath());
                    break;
                case INITIALIZED:
                    System.out.println("INITIALIZED:" + event.getData().getPath());
                    break;
                default:
                    break;
            }
        });
//        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path); //监听节点本身的变化不会通知
        Thread.sleep(1000);
        client.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.

TreeCache:

Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

public class ZkCuratorTreeCache {
    public static void main(String[] args) throws Exception {
        treeCache();
    }
    public static void treeCache() throws Exception {
        final String path = "/treeChildrenCache";
        final CuratorFramework client = BaseOperator.getClient();
        client.start();
        final TreeCache cache = new TreeCache(client, path);
        cache.start(); 
        cache.getListenable().addListener((client1, event) -> {
            switch (event.getType()){
                case NODE_ADDED:
                    System.out.println("NODE_ADDED:" + event.getData().getPath());
                    break;
                case NODE_REMOVED:
                    System.out.println("NODE_REMOVED:" + event.getData().getPath());
                    break;
                case NODE_UPDATED:
                    System.out.println("NODE_UPDATED:" + event.getData().getPath());
                    break;
                case CONNECTION_LOST:
                    System.out.println("CONNECTION_LOST:" + event.getData().getPath());
                    break;
                case CONNECTION_RECONNECTED:
                    System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath());
                    break;
                case CONNECTION_SUSPENDED:
                    System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath());
                    break;
                case INITIALIZED:
                    System.out.println("INITIALIZED:" + event.getData().getPath());
                    break;
                default:
                    break;
            }
        });
 
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
 
        client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
        Thread.sleep(1000);
 
        BaseOperator.setData(client, path, "test".getBytes());
        Thread.sleep(1000);
 
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
 
        client.delete().forPath(path);
        Thread.sleep(1000);
 
        client.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.

3、分布式锁应用

可重入锁Shared Reentrant Lock。

Shared意味着锁是全局可见的, 客户端都可以请求锁。Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类InterProcessMutex来实现。它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)
  • 1.

不可重入锁Shared Lock。

使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。

可重入读写锁Shared Reentrant Read Write Lock 类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。主要由两个类实现:

InterProcessReadWriteLock
InterProcessLock
  • 1.
  • 2.

信号量Shared Semaphore 一个计数的信号量类似JDK的Semaphore。JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。调用acquire会返回一个租约对象。客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。租约还可以通过下面的方式返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
  • 1.
  • 2.

多锁对象Multi Shared Lock Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类:

InterProcessMultiLock
InterProcessLock
  • 1.
  • 2.

它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
  • 1.
  • 2.

代码演示:

public class ZkCuratorLock {
    private static final String zk_server = "127.0.0.1:2181";
    private static final String zk_path = "/curator/zklock"; 
    public static void doWithLock(CuratorFramework curatorFramework){
        List<String> zkPaths = new ArrayList<String>();
        zkPaths.add(zk_path);
        InterProcessMultiLock lock = new InterProcessMultiLock(curatorFramework,zkPaths);
//        InterProcessMutex lock2 = new InterProcessMutex(curatorFramework,zk_path);
        try {
            if(lock.acquire(30, TimeUnit.SECONDS)){
                long threadId = Thread.currentThread().getId();
                System.out.println("线程-"+threadId+",acquire lock");
                Thread.sleep(1000);
                System.out.println("线程-"+threadId+",replease lock");
            }
        }catch (Exception e){
            e.fillInStackTrace();
        }finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.fillInStackTrace();
            }
        }
    } 
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for(int i=10;i>0;i--){
            es.execute(new Runnable() {
                @Override
                public void run() {
                    RetryNTimes retryNTimes = new RetryNTimes(1000, 3);
                    CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zk_server, retryNTimes);
                    curatorFramework.start();
                    ZkCuratorLock.doWithLock(curatorFramework);
                }
            });
        }
        es.shutdown();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.

4、栅栏barrier

DistributedBarrier构造函数中barrierPath参数用来确定一个栅栏,只要barrierPath参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下:

  • 主导client设置一个栅栏。
  • 其他客户端就会调用waitOnBarrier()等待栅栏移除,程序处理线程阻塞。
  • 主导client移除栅栏,其他客户端的处理程序就会同时继续运行。 DistributedBarrier类的主要方法如下: setBarrier() - 设置栅栏 waitOnBarrier() - 等待栅栏移除 removeBarrier() - 移除栅栏。

双栅栏Double Barrier 双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。双栅栏类是DistributedDoubleBarrier DistributedDoubleBarrier类实现了双栅栏的功能。它的构造函数如下:

// client - the client
// barrierPath - path to use
// memberQty - the number of members in the barrier
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
  • 1.
  • 2.
  • 3.
  • 4.

memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。当leave方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave。 注意:参数memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开! 与栅栏(DistributedBarrier)一样,双栅栏的barrierPath参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下:

  • 从多个客户端在同一个路径上创建双栅栏(DistributedDoubleBarrier),然后调用enter()方法,等待栅栏数量达到memberQty时就可以进入栅栏。
  • 栅栏数量达到memberQty,多个客户端同时停止阻塞继续运行,直到执行leave()方法,等待memberQty个数量的栅栏同时阻塞到leave()方法中。
  • memberQty个数量的栅栏同时阻塞到leave()方法中,多个客户端的leave()方法停止阻塞,继续运行。 DistributedDoubleBarrier类的主要方法如下:enter()、enter(long maxWait, TimeUnit unit) - 等待同时进入栅栏 leave()、leave(long maxWait, TimeUnit unit) - 等待同时离开栅栏 异常处理:DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()和leave方法会抛出异常。

5、计数器Counters

利用ZooKeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。

1)SharedCount 这个类使用int类型来计数。主要涉及三个类。

  • SharedCount。
  • SharedCountReader。
  • SharedCountListener SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。

2)DistributedAtomicLong 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。此计数器有一系列的操作:

  • get(): 获取当前值。
  • increment():加一。
  • decrement(): 减一。
  • add():增加特定的值。
  • subtract(): 减去特定的值。
  • trySet(): 尝试设置计数值。
  • forceSet(): 强制设置计数值。

你必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

6、 选举

curator提供了两种方式,分别是Leader Latch和Leader Election。

  • Leader Latch。

随机从候选者中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。

public class LeaderLatchTest {
    private static final String PATH = "/demo/leader";
    public static void main(String[] args) {
        List<LeaderLatch> latchList = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient();
                client.start();
                clients.add(client);

                final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);
                leaderLatch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!");
                    }

                    @Override
                    public void notLeader() {
                        System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!");
                    }
                });
                latchList.add(leaderLatch);
                leaderLatch.start();
            }
            Thread.sleep(1000 * 60);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }

            for (LeaderLatch leaderLatch : latchList) {
                CloseableUtils.closeQuietly(leaderLatch);
            }
        }
    }
    public static CuratorFramework getClient() {
        return CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒
                .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒
                .namespace("arch") //设置命名空间
                .build();
    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • Leader Election。

通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。

public class LeaderSelectorTest {
    private static final String PATH = "/demo/leader";
    public static void main(String[] args) {
        List<LeaderSelector> selectors = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient();
                client.start();
                clients.add(client);

                final String name = "client#" + i;
                LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListenerAdapter() {
                    @Override
                    public void takeLeadership(CuratorFramework client) throws Exception {
                        System.out.println(name + ":I am leader.");
                        Thread.sleep(2000);
                    }
                });
                leaderSelector.autoRequeue();
                leaderSelector.start();
                selectors.add(leaderSelector);
            }
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            for (LeaderSelector selector : selectors) {
                CloseableUtils.closeQuietly(selector);
            }

        }
    }
    public static CuratorFramework getClient() {
        return CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒
                .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒
                .namespace("arch") //设置命名空间
                .build();
    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.

至此Zookeeper的应用大体讲完了,在这里多说一句,技术的API不用去背,背也是背不住的,多使用就好了。