Redis分布式锁设计和实现

Redlock.xmind

参考文章

Distributed locks with Redis

分布式锁需求

  1. 互斥
  2. 过期锁的释放
  3. 容错能力

算法设计(主要是获取锁)

  1. 获取当前服务时间
  2. 尝试从全部N个实例获取锁
  3. 每个实例计算锁的有效时间为: 需要加锁时间-获取锁消耗时间
  4. 只有大部分实例成功获取锁这次加锁才能成功
  5. 如果加锁成功,锁的有效时间为所有实例有效时间中最短时间
  6. 加锁失败就请求所有实例释放该锁(无论是否加锁)

Java代码实现

Redisson的RedLock实现–加锁部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}

long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);

int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍历所有的实例
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 尝试加锁
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 加锁失败全部释放
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}

// 获取锁成功加入acquiredLocks List
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}

if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}

if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
// 遍历循环结束

if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
// 循环全部成功获取锁的实例异步加锁
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}

return true;
}