xmtrock
发布于 2021-06-25 / 219 阅读
0

Zookeeper的操作(java)

<!-- curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>

基础建立连接、创建

private CuratorFramework client;

/**
 * 建立连接
 */
@Before
public void testConnect() {
    /**
     * connectString – 连接字符串,ip1:端口,ip1:端口。
     * sessionTimeoutMs – 会话超时时间(有默认)
     * connectionTimeoutMs – 连接超时时间(有默认)
     * retryPolicy – 重试策略(模式、方式)
     */

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

    //方式1
    this.client = CuratorFrameworkFactory.newClient(
            "47.115.203.188:2181",
            60 * 1000,
            15 * 1000,
            retryPolicy);
    this.client.start();

    //集成ACL,以后认证登录与添加认证都不需另外设置,因为这里已经设置了
    //创建权限管理器
    ACLProvider aclProvider = new ACLProvider() {
        private List<ACL> acl;

        @Override
        public List<ACL> getDefaultAcl() {
            if (acl == null) {
                ArrayList<ACL> acl = ZooDefs.Ids.CREATOR_ALL_ACL; //初始化
                acl.clear();
                acl.add(new ACL(ZooDefs.Perms.ALL, new Id("auth", "happylayga:123456")));
                this.acl = acl;
            }
            return acl;
        }

        @Override
        public List<ACL> getAclForPath(String path) {
            return acl;
        }
    };


    //方式2
    this.client = CuratorFrameworkFactory
            .builder().connectString("47.115.203.188:2181")
            .aclProvider(aclProvider)//用权限管理器登录
            .authorization("digest", ("happylayga:123456").getBytes()) //使用用户名/密码进行连接
            .sessionTimeoutMs(60 * 1000)
            .connectionTimeoutMs(15 * 1000)
            .retryPolicy(retryPolicy)
            .build();
    this.client.start();
}

@After
public void close() {
    if (this.client != null) {
        this.client.close();
    }
}

/**
 * 创建结点:持久、临时、顺序、数据
 */
@Test
public void testCreate() throws Exception {
    String path = this.client.create().forPath("/app1");//不指定则默认数据是ip地址
    System.out.println(path);

    String path2 = this.client.create().forPath("/apptestAuth", "appdata".getBytes(StandardCharsets.UTF_8));
    System.out.println(path2);

    /**
     * PERSISTENT 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
     * PERSISTENT_SEQUENTIAL 同上,只是Zookeeper给该节点名称进行顺序编号
     * EPHEMERAL 临时
     * EPHEMERAL_SEQUENTIAL 同上,只是Zookeeper给该节点名称进行顺序编号
     * CONTAINER 容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
     * PERSISTENT_WITH_TTL 带TTL(time-to-live,存活时间)的永久节点,节点在TTL时间之内没有得到更新并且没有孩子节点,就会被自动删除。
     * PERSISTENT_SEQUENTIAL_WITH_TTL 带TTL(time-to-live,存活时间)和单调递增序号的永久节点,节点在TTL时间之内没有得到更新并且没有孩子节点,就会被自动删除。
     */
    String path3 = this.client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "调整结点类型".getBytes(StandardCharsets.UTF_8));
    System.out.println(path3);
    //while(true){}//这个状态下,EPHEMERAL将会持续不关闭会话,可以看到对应/app3的数据

    String path4 = this.client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/app4/node", "多级结点".getBytes(StandardCharsets.UTF_8));
    System.out.println(path4);
}

/**
 * 查询结点
 */
@Test
public void testGet() throws Exception {
    byte[] bytes = this.client.getData().forPath("/app1");
    System.out.println(new String(bytes));
}
@Test
public void testLs() throws Exception {
    List<String> ls = this.client.getChildren().forPath("/");
    System.out.println(ls);
}
@Test
public void testState() throws Exception {
    Stat stat = new Stat();
    this.client.getData().storingStatIn(stat).forPath("/app4/node");
    System.out.println(stat);
    System.out.println(stat.getCversion());
}

/**
 * 改
 */
@Test
public void testSet() throws Exception {
    Stat stat = this.client.setData().forPath("/app1", "app1Data修改".getBytes(StandardCharsets.UTF_8));
    byte[] bytes = this.client.getData().forPath("/app1");
    System.out.println(new String(bytes));
}
@Test
public void testSetVersion() throws Exception {
    Stat stat = new Stat();
    this.client.getData().storingStatIn(stat).forPath("/app1");
    System.out.println(stat.getVersion());//查出版本显示是2
    Stat statNew = this.client.setData().withVersion(2).forPath("/app1", "app1修改了版本2".getBytes(StandardCharsets.UTF_8));
    System.out.println(statNew.getMtime());
}

/**
 * 删。单删、删子、必须成功、回调
 */
@Test
public void testDelete() throws Exception{
    this.client.delete().forPath("/app2");
}
@Test
public void testDeleteWithNode() throws Exception{
    this.client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDeleteMustSuccess() throws Exception{
    //一次删不成就重试
    this.client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDeleteWithCallback() throws Exception{
    this.client.delete().guaranteed().deletingChildrenIfNeeded()
            .inBackground(
            new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("我被删除了");
                    System.out.println(client); //可以放出去外部变量的
                    System.out.println(event);
                }
            })
    .forPath("/app4");
}

监听

/**
 * 监听
 */
@Test
public void testNodeCache() throws Exception{
    // 1、注册NodeCache对象
    NodeCache nodeCache = new NodeCache(this.client,"/app1");
    // 2、注册监听
    nodeCache.getListenable().addListener(
            new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    //获取变化状态(数据)
                    byte[] data = nodeCache.getCurrentData().getData();
                    System.out.println("结点数据变化了:"+new String(data));
                }
            }
    );
    // 3、开启监听
    nodeCache.start(true);
    while (true){}
}

监听包括子节点

/**
 * 监听某结点的所有子结点
 */
@Test
public void testNodeCacheWhole() throws Exception {
    // 1、创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(this.client, "/app4", true);
    // 2、绑定监听器
    pathChildrenCache.getListenable().addListener(
            new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    System.out.println(pathChildrenCacheEvent);
                    //1、获取类型
                    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                    //2、判断类型是否是update
                    if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                        byte[] data = pathChildrenCacheEvent.getData().getData();
                        System.out.println(new String(data));
                    }
                }
            }
    );
    // 3、开启监听
    pathChildrenCache.start(true);
    while (true) { }
}

所有都监听

/**
 * TreeCache监听某个结点自己+子结点,比前两个全面
 */
@Test
public void testTreeCache() throws Exception {
    // 1、创建监听对象
    TreeCache treeCache = new TreeCache(this.client, "/app4");
    //2、注册监听
    treeCache.getListenable().addListener(
            new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                    System.out.println(new String(treeCacheEvent.getData().getData()));
                }
            }
    );
    treeCache.start();
    while (true) { }
}

案例展示:
12306售票对象,加了锁

public class Ticket12306 implements Runnable {

    private int tickets = 10;//数据库的票数
    private InterProcessMutex lock;//分布式锁
    private CuratorFramework client;

    public Ticket12306() { //无参构造,默认
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

        //方式2
        this.client = CuratorFrameworkFactory
                .builder().connectString("47.115.203.188:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        this.client.start();

        this.lock = new InterProcessMutex(this.client, "/lock");
    }

    @Override
    public void run() {
        while (true) {
            try {
                //获取锁
                //时间、单位
                lock.acquire(6, TimeUnit.MILLISECONDS);//如果3秒钟了没获取到就稍后再来
                if (tickets > 0) {
                    System.out.println(Thread.currentThread() + ":" + tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //释锁
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();

        //创建客户端
        Thread t1 = new Thread(ticket12306, "携程");
        Thread t2 = new Thread(ticket12306, "飞猪");

        t1.start();
        t2.start();
    }
}

20210625053358047