<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
基础建立连接、创建
private CuratorFramework client;
/**
* 建立连接
*/
@Before
public void testConnect() {
/**
* connectString – 连接字符串,ip1:端口,ip1:端口。
* sessionTimeoutMs – 会话超时时间(有默认)
* connectionTimeoutMs – 连接超时时间(有默认)
* retryPolicy – 重试策略(模式、方式)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//方式1
this.client = CuratorFrameworkFactory.newClient(
"47.115.203.188:2181",
60 * 1000,
15 * 1000,
retryPolicy);
this.client.start();
//集成ACL,以后认证登录与添加认证都不需另外设置,因为这里已经设置了
//创建权限管理器
ACLProvider aclProvider = new ACLProvider() {
private List<ACL> acl;
@Override
public List<ACL> getDefaultAcl() {
if (acl == null) {
ArrayList<ACL> acl = ZooDefs.Ids.CREATOR_ALL_ACL; //初始化
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("auth", "happylayga:123456")));
this.acl = acl;
}
return acl;
}
@Override
public List<ACL> getAclForPath(String path) {
return acl;
}
};
//方式2
this.client = CuratorFrameworkFactory
.builder().connectString("47.115.203.188:2181")
.aclProvider(aclProvider)//用权限管理器登录
.authorization("digest", ("happylayga:123456").getBytes()) //使用用户名/密码进行连接
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
this.client.start();
}
@After
public void close() {
if (this.client != null) {
this.client.close();
}
}
/**
* 创建结点:持久、临时、顺序、数据
*/
@Test
public void testCreate() throws Exception {
String path = this.client.create().forPath("/app1");//不指定则默认数据是ip地址
System.out.println(path);
String path2 = this.client.create().forPath("/apptestAuth", "appdata".getBytes(StandardCharsets.UTF_8));
System.out.println(path2);
/**
* PERSISTENT 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
* PERSISTENT_SEQUENTIAL 同上,只是Zookeeper给该节点名称进行顺序编号
* EPHEMERAL 临时
* EPHEMERAL_SEQUENTIAL 同上,只是Zookeeper给该节点名称进行顺序编号
* CONTAINER 容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
* PERSISTENT_WITH_TTL 带TTL(time-to-live,存活时间)的永久节点,节点在TTL时间之内没有得到更新并且没有孩子节点,就会被自动删除。
* PERSISTENT_SEQUENTIAL_WITH_TTL 带TTL(time-to-live,存活时间)和单调递增序号的永久节点,节点在TTL时间之内没有得到更新并且没有孩子节点,就会被自动删除。
*/
String path3 = this.client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "调整结点类型".getBytes(StandardCharsets.UTF_8));
System.out.println(path3);
//while(true){}//这个状态下,EPHEMERAL将会持续不关闭会话,可以看到对应/app3的数据
String path4 = this.client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/app4/node", "多级结点".getBytes(StandardCharsets.UTF_8));
System.out.println(path4);
}
查
/**
* 查询结点
*/
@Test
public void testGet() throws Exception {
byte[] bytes = this.client.getData().forPath("/app1");
System.out.println(new String(bytes));
}
@Test
public void testLs() throws Exception {
List<String> ls = this.client.getChildren().forPath("/");
System.out.println(ls);
}
@Test
public void testState() throws Exception {
Stat stat = new Stat();
this.client.getData().storingStatIn(stat).forPath("/app4/node");
System.out.println(stat);
System.out.println(stat.getCversion());
}
改
/**
* 改
*/
@Test
public void testSet() throws Exception {
Stat stat = this.client.setData().forPath("/app1", "app1Data修改".getBytes(StandardCharsets.UTF_8));
byte[] bytes = this.client.getData().forPath("/app1");
System.out.println(new String(bytes));
}
@Test
public void testSetVersion() throws Exception {
Stat stat = new Stat();
this.client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat.getVersion());//查出版本显示是2
Stat statNew = this.client.setData().withVersion(2).forPath("/app1", "app1修改了版本2".getBytes(StandardCharsets.UTF_8));
System.out.println(statNew.getMtime());
}
删
/**
* 删。单删、删子、必须成功、回调
*/
@Test
public void testDelete() throws Exception{
this.client.delete().forPath("/app2");
}
@Test
public void testDeleteWithNode() throws Exception{
this.client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDeleteMustSuccess() throws Exception{
//一次删不成就重试
this.client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDeleteWithCallback() throws Exception{
this.client.delete().guaranteed().deletingChildrenIfNeeded()
.inBackground(
new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了");
System.out.println(client); //可以放出去外部变量的
System.out.println(event);
}
})
.forPath("/app4");
}
监听
/**
* 监听
*/
@Test
public void testNodeCache() throws Exception{
// 1、注册NodeCache对象
NodeCache nodeCache = new NodeCache(this.client,"/app1");
// 2、注册监听
nodeCache.getListenable().addListener(
new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
//获取变化状态(数据)
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("结点数据变化了:"+new String(data));
}
}
);
// 3、开启监听
nodeCache.start(true);
while (true){}
}
监听包括子节点
/**
* 监听某结点的所有子结点
*/
@Test
public void testNodeCacheWhole() throws Exception {
// 1、创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(this.client, "/app4", true);
// 2、绑定监听器
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println(pathChildrenCacheEvent);
//1、获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//2、判断类型是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
}
);
// 3、开启监听
pathChildrenCache.start(true);
while (true) { }
}
所有都监听
/**
* TreeCache监听某个结点自己+子结点,比前两个全面
*/
@Test
public void testTreeCache() throws Exception {
// 1、创建监听对象
TreeCache treeCache = new TreeCache(this.client, "/app4");
//2、注册监听
treeCache.getListenable().addListener(
new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(new String(treeCacheEvent.getData().getData()));
}
}
);
treeCache.start();
while (true) { }
}
案例展示:
12306售票对象,加了锁
public class Ticket12306 implements Runnable {
private int tickets = 10;//数据库的票数
private InterProcessMutex lock;//分布式锁
private CuratorFramework client;
public Ticket12306() { //无参构造,默认
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//方式2
this.client = CuratorFrameworkFactory
.builder().connectString("47.115.203.188:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
this.client.start();
this.lock = new InterProcessMutex(this.client, "/lock");
}
@Override
public void run() {
while (true) {
try {
//获取锁
//时间、单位
lock.acquire(6, TimeUnit.MILLISECONDS);//如果3秒钟了没获取到就稍后再来
if (tickets > 0) {
System.out.println(Thread.currentThread() + ":" + tickets);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306, "携程");
Thread t2 = new Thread(ticket12306, "飞猪");
t1.start();
t2.start();
}
}
