zookeeper分布式锁

基于zookeeper 临时顺序节点实现分布式锁(不可重入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.yahui.fan.zookeeper.distributedLock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ZookeeperDistributeLock2 implements Lock {

private static final String HOST = "127.0.0.1:2181";

private ZkClient zkClient = new ZkClient(HOST,4000, 4000, new SerializableSerializer());

private static final String ROOT_PATH = "/root";
private static final String CHILDREN_PATH = "lock";

private Optional<String> beforePath = Optional.empty();
private Optional<String> currentPath = Optional.empty();

private CountDownLatch countDownLatch;


public ZookeeperDistributeLock2() {
if (!zkClient.exists(ROOT_PATH)) {
try {
zkClient.createPersistent(ROOT_PATH);
} catch (RuntimeException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName()+"----"+"根节点创建失败。。。");
}
}
}



@Override
public void lock() {
if(!tryLock()){
waitLock();
lock();
}

}



public boolean tryLock(){
//如果当前节点为空,创建临时顺序节点
if(!currentPath.isPresent()){
this.currentPath = Optional.of(zkClient.createEphemeralSequential(ROOT_PATH+"/",CHILDREN_PATH));
System.out.println(Thread.currentThread().getName()+"----"+"当前节点为空,创建节点:"+this.currentPath.get());
}
//获取根节点下所有自节点
List<String> children = zkClient.getChildren(ROOT_PATH);
TreeSet<String> nodes = new TreeSet<>();
for (String s : children) {
nodes.add(ROOT_PATH + "/" + s);
}
if(this.currentPath.get().equals(nodes.first())){
System.out.println(Thread.currentThread().getName()+"----"+"获取锁:"+this.currentPath.get());
//如果是最小节点,获取锁
return true;
}else {
System.out.println(Thread.currentThread().getName()+"----"+this.currentPath.orElse(null) + "未获得锁");
this.beforePath = Optional.ofNullable(nodes.lower(currentPath.get()));
return false;
}
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {
System.out.println(Thread.currentThread().getName()+"----"+"释放锁:"+this.currentPath.get());
this.zkClient.delete(this.currentPath.get());
}



public void waitLock(){
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {

}

@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println(Thread.currentThread().getName()+"----"+"前锁:"+s+"删除,触发计数器,启动等待线程。。。");
if(null != countDownLatch){
//节点删除时计数器减一
countDownLatch.countDown();
}

}
};
//给前节点增加监听
System.out.println(Thread.currentThread().getName()+"----"+"给前锁:"+this.beforePath.get()+",增加监听。。。");
this.zkClient.subscribeDataChanges(this.beforePath.get(),iZkDataListener);
//如果前节点存在,阻塞等待
if(this.zkClient.exists(this.beforePath.get())){
countDownLatch = new CountDownLatch(1);
try {
System.out.println(Thread.currentThread().getName()+"----"+"线程等待。。。");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName()+"----"+"线程等待异常:"+e.toString());
}
}
//释放监听
System.out.println(Thread.currentThread().getName()+"----"+"释放前锁:"+this.beforePath.get()+",监听。。。");
this.zkClient.unsubscribeDataChanges(this.beforePath.get(),iZkDataListener);
}



@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public Condition newCondition() {
return null;
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.yahui.fan.zookeeper.distributedLock;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class DistrLockTest implements Runnable {


private static SimpleDateFormat simpleDateFormat;
private static int count;

// 同时并发的线程数
private static final int NUM = 5;
// 按照线程数初始化倒计数器,倒计数器
private static CountDownLatch cdl = new CountDownLatch(NUM);

// private DistrLock lock = new DistrLock();

private ZookeeperDistributeLock2 lock1 = new ZookeeperDistributeLock2();


// 创建订单接口
public void createOrder() {
//准备获取锁
lock1.lock();
try {
// 获取订单编号
simpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmss");
System.err.println(Thread.currentThread().getName() + "----" + "订单号:" + simpleDateFormat.format(new Date()) + "" + count++);
} catch (Exception e) {
// TODO: handle exception
} finally {
//完成业务逻辑以后释放锁
lock1.unlock();
}
}


public void run() {
try {
// 等待其他线程初始化
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 创建订单
createOrder();
}

public static void main(String[] args) {
for (int i = 1; i <= NUM; i++) {
// 按照线程数迭代实例化线程
new Thread(new DistrLockTest()).start();
// 创建一个线程,倒计数器减1
cdl.countDown();
}
}
}

##执行过程
Thread-8—-当前节点为空,创建节点:/root/0000000025
Thread-2—-当前节点为空,创建节点:/root/0000000027
Thread-14—-当前节点为空,创建节点:/root/0000000028
Thread-5—-当前节点为空,创建节点:/root/0000000029
Thread-11—-当前节点为空,创建节点:/root/0000000026
Thread-8—-获取锁:/root/0000000025
Thread-11—-/root/0000000026未获得锁
Thread-2—-/root/0000000027未获得锁
Thread-5—-/root/0000000029未获得锁
Thread-14—-/root/0000000028未获得锁

Thread-8—-订单号:201908021008410

Thread-8—-释放锁:/root/0000000025
Thread-11—-给前锁:/root/0000000025,增加监听。。。
Thread-14—-给前锁:/root/0000000027,增加监听。。。
Thread-5—-给前锁:/root/0000000028,增加监听。。。
Thread-2—-给前锁:/root/0000000026,增加监听。。。
Thread-5—-线程等待。。。
Thread-2—-线程等待。。。
Thread-14—-线程等待。。。
Thread-11—-释放前锁:/root/0000000025,监听。。。
Thread-11—-获取锁:/root/0000000026

Thread-11—-订单号:201908021008411

Thread-11—-释放锁:/root/0000000026
ZkClient-EventThread-15-127.0.0.1:2181—-前锁:/root/0000000026删除,触发计数器,启动等待线程。。。
ZkClient-EventThread-36-127.0.0.1:2181—-前锁:/root/0000000025删除,触发计数器,启动等待线程。。。
Thread-2—-释放前锁:/root/0000000026,监听。。。
Thread-2—-获取锁:/root/0000000027

Thread-2—-订单号:201908021008412

Thread-2—-释放锁:/root/0000000027
ZkClient-EventThread-43-127.0.0.1:2181—-前锁:/root/0000000027删除,触发计数器,启动等待线程。。。
Thread-14—-释放前锁:/root/0000000027,监听。。。
Thread-14—-获取锁:/root/0000000028

Thread-14—-订单号:201908021008413

Thread-14—-释放锁:/root/0000000028
ZkClient-EventThread-22-127.0.0.1:2181—-前锁:/root/0000000028删除,触发计数器,启动等待线程。。。
Thread-5—-释放前锁:/root/0000000028,监听。。。
Thread-5—-获取锁:/root/0000000029

Thread-5—-订单号:201908021008414

Thread-5—-释放锁:/root/0000000029

不可重入锁可参考这篇文章

https://blog.csdn.net/u013278314/article/details/82715716