Redlock.xmind

参考文章

Distributed locks with Redis

分布式锁需求

  1. 互斥
  2. 过期锁的释放
  3. 容错能力

算法设计(主要是获取锁)

  1. 获取当前服务时间
  2. 尝试从全部N个实例获取锁
  3. 每个实例计算锁的有效时间为: 需要加锁时间-获取锁消耗时间
  4. 只有大部分实例成功获取锁这次加锁才能成功
  5. 如果加锁成功,锁的有效时间为所有实例有效时间中最短时间
  6. 加锁失败就请求所有实例释放该锁(无论是否加锁)

Java代码实现

Redisson的RedLock实现–加锁部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}

long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);

int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍历所有的实例
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 尝试加锁
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 加锁失败全部释放
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}

// 获取锁成功加入acquiredLocks List
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}

if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}

if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
// 遍历循环结束

if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
// 循环全部成功获取锁的实例异步加锁
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}

return true;
}

前言

这篇blog主要是在看了Doug Lea的fork/join实现的paper之后的一个总结.

文章主要参考了:

  1. A Java Fork/Join Framework(PDF)
  2. Java Fork/Join框架–翻译
  3. Overview of package util.concurrent Release 1.3.4.(Doug Lea的fork/join代码实现)

整个paper笔记的思维导图就是上面的图片,这里也留一下xmind文件链接: fork-join.xmind

代码

由于paper以及翻译对实现的思路讲解已经很详细了,这里就不再赘述,主要还是结合代码实现再看一下整个框架.

就像论文中提到的,Java部分的实现主要为下面三个类:

  1. FJTask(implement Runnable)
  2. FJTaskRunner(extends Thread)
  3. FJTaskRunnerGroup

下面就分别看一下三个类的代码实现.

FJTask

fork/join代码都非常的简单,因为这只是一个实现了Runnable接口的轻量线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 下面贴出来的是论文中讲到的FJTask有的方法,其余的一些辅助的方法也就不加进来了

private volatile boolean done; // = false;

public final boolean isDone() { return done; }

public static FJTaskRunner getFJTaskRunner() {
return (FJTaskRunner)(Thread.currentThread());
}

public void fork() { getFJTaskRunner().push(this); }

public void join() { getFJTaskRunner().taskJoin(this); }

public static void coInvoke(FJTask task1, FJTask task2) {
getFJTaskRunner().coInvoke(task1, task2);
}

FJTaskRunner

这个类是整个fork/join框架的核心,代码也比较多

push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Push a task onto DEQ.
* Called ONLY by current thread.
**/

protected final void push(final FJTask r) {
int t = top;

/*
This test catches both overflows and index wraps. It doesn't
really matter if base value is in the midst of changing in take.
As long as deq length is < 2^30, we are guaranteed to catch wrap in
time since base can only be incremented at most length times
between pushes (or puts).
*/
// 这里是重点,注释中也已经讲的很清楚了
if (t < (base & (deq.length-1)) + deq.length) {

deq[t & (deq.length-1)].put(r);
top = t + 1;
}

else // isolate slow case to increase chances push is inlined
slowPush(r); // check overflow and retry
}

// slowPush主要是为了应对数组的resize
/**
* Handle slow case for push
**/

protected synchronized void slowPush(final FJTask r) {
checkOverflow();
push(r); // just recurse -- this one is sure to succeed.
}

pop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Return a popped task, or null if DEQ is empty.
* Called ONLY by current thread.
* <p>
* This is not usually called directly but is
* instead inlined in callers. This version differs from the
* cilk algorithm in that pop does not fully back down and
* retry in the case of potential conflict with take. It simply
* rechecks under synch lock. This gives a preference
* for threads to run their own tasks, which seems to
* reduce flailing a bit when there are few tasks to run.
**/

protected final FJTask pop() {
/*
Decrement top, to force a contending take to back down.
*/

int t = --top;

/*
To avoid problems with JVMs that do not properly implement
read-after-write of a pair of volatiles, we conservatively
grab without lock only if the DEQ appears to have at least two
elements, thus guaranteeing that both a pop and take will succeed,
even if the pre-increment in take is not seen by current thread.
Otherwise we recheck under synch.
*/

if (base + 1 < t)
return deq[t & (deq.length-1)].take();
else
// 这就是论文中提到的pop失败之后会重试,直到队列真的为空
return confirmPop(t);

}

/**
* Check under synch lock if DEQ is really empty when doing pop.
* Return task if not empty, else null.
**/

protected final synchronized FJTask confirmPop(int provisionalTop) {
if (base <= provisionalTop)
return deq[provisionalTop & (deq.length-1)].take();
else { // was empty
/*
Reset DEQ indices to zero whenever it is empty.
This both avoids unnecessary calls to checkOverflow
in push, and helps keep the DEQ from accumulating garbage
*/

top = base = 0;
return null;
}
}

take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/** 
* Take a task from the base of the DEQ.
* Always called by other threads via scan()
**/


protected final synchronized FJTask take() {

/*
Increment base in order to suppress a contending pop
*/

int b = base++;

if (b < top)
return confirmTake(b);
else {
// back out
// take的机制就是类似于fail fast, 会去尝试窃取其他线程的taks
base = b;
return null;
}
}


/**
* double-check a potential take
**/

protected FJTask confirmTake(int oldBase) {

/*
Use a second (guaranteed uncontended) synch
to serve as a barrier in case JVM does not
properly process read-after-write of 2 volatiles
*/

synchronized(barrier) {
if (oldBase < top) {
/*
We cannot call deq[oldBase].take here because of possible races when
nulling out versus concurrent push operations. Resulting
accumulated garbage is swept out periodically in
checkOverflow, or more typically, just by keeping indices
zero-based when found to be empty in pop, which keeps active
region small and constantly overwritten.
*/

return deq[oldBase & (deq.length-1)].get();
}
else {
base = oldBase;
return null;
}
}
}

剩下的代码就是对dqueue的resize和一些辅助的函数

FJTaskRunnerGroup

主要是对FJTaskRunner的管理的一些辅助函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/** The threads in this group **/
protected final FJTaskRunner[] threads;

/** Group-wide queue for tasks entered via execute() **/
protected final LinkedQueue entryQueue = new LinkedQueue();

/**
* Create a FJTaskRunnerGroup with the indicated number
* of FJTaskRunner threads. Normally, the best size to use is
* the number of CPUs on the system.
* <p>
* The threads in a FJTaskRunnerGroup are created with their
* isDaemon status set, so do not normally need to be
* shut down manually upon program termination.
**/

public FJTaskRunnerGroup(int groupSize) {
threads = new FJTaskRunner[groupSize];
initializeThreads();
initTime = System.currentTimeMillis();
}

/**
* Arrange for execution of the given task
* by placing it in a work queue. If the argument
* is not of type FJTask, it is embedded in a FJTask via
* <code>FJTask.Wrap</code>.
* @exception InterruptedException if current Thread is
* currently interrupted
**/

public void execute(Runnable r) throws InterruptedException {
if (r instanceof FJTask) {
entryQueue.put((FJTask)r);
}
else {
entryQueue.put(new FJTask.Wrap(r));
}
signalNewTask();
}

whats else

首先,Doug Lea的论文真的很牛逼,可以说把整个框架的实现细节都讲的很清楚了,而且在代码中无处不在的各种注释也对于框架的理解非常有用.在源码包里还有对应生产的JavaDoc可以说阅读Java基础代码的整体体验都是非常不错的(集合包中的注释也是非常的完整).

其次,最近开始尝试使用思维导图来整理知识,这算是第一次尝试,总体感觉还是很不错的.

前言

对于测试的重要性,虽然不想说什么漂亮话,但是作为敏捷开发实践的重要一环,以及本着对自己代码负责的理念,在项目中保证必要的单元测试和覆盖率还是非常重要的。

在研究怎么在SpringBoot中优雅的进行单元测试的过程中,主要参考了:

  1. 有赞单元测试实践:主要参考了工具选型的思路和整合的思路。
  2. SpringTest文档
  3. SpringBoot Test文档
  4. SpringBoot 中文翻译文档–测试

在文档中主要参考了MockMvc的使用,关于MockMvc之前也写过blog:翻译:Spring MVC Test Framework–MockMvc使用.在当时主要是尝试了对单个项目的端口测试,在这次研究中主要考虑如何方便的对单个服务中依赖的服务进行打桩(stub)和Mock.

续前言

原本是打算把每个框架的使用方法都写写的,但是在几天的整合实践之后发现官方的文档都很友善所以下面就当作一个内容的小结,把一些需要注意的点总结一下,详细的用法还是看官方文档就可以了.

PowerMock

PowerMock在单测中的应用场景主要是对单测服务所依赖静态类的Mock.

官方的文档地址(主要使用了Mockito接口)

使用方法官方也给出了例子,在静态方法Mock时主要需要@PrepareForTest注解:

1
2
3
4
5
6
7
8
9
10
// 1.类注解@PrepareForTest
@PrepareForTest(Static.class) // Static.class contains static methods
// 2.指定需要mock的静态方法
PowerMockito.mockStatic(Static.class);
// 3.使用Mockito接口来mock
// 在mock返回值为`void`的静态方法时可以使用`PowerMockito`的`when()`来进行mock
Mockito.when(Static.firstStaticMethod(param)).thenReturn(value);
// 4.校验mock结果
PowerMockito.verifyStatic(Static.class); // 1
Static.firstStaticMethod(param); // 2

SpringBoot集成Mock

整合主要参考了有赞单元测试实践中的整合方法,主要是需要整合PowerMock使得能够对静态类进行mock.

由于mock静态方法需要

@PrepareForTest({Static.class})

注解所以需要将UnitRunner指定为PowerMockRunner,而由于需要和Spring整合,则需要SpringRunner,整合结果:

@RunWith(PowerMockRunner.class)

@PowerMockRunnerDelegate(SpringRunner.class)

最终的整合结果为:

1
2
3
4
5
6
7
8
9
10
// powermock
@RunWith(PowerMockRunner.class)
// 整合springtest
@PowerMockRunnerDelegate(SpringRunner.class)
// 排除java.security
@PowerMockIgnore( {"javax.management.*", "javax.net.ssl.*"})
// mock静态类注解
@PrepareForTest(Static.class)
// SpringBoot测试的注解
@SpringBootTest

参照有赞的单元测试基类代码:
image-20200414201352164

SpringBoot自带MockBean

主要基于SpringBoot的文档(发现SpringBoot相关的东西还是得多看官方文档才行)介绍的@MockBean注解.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.junit.*;
import org.junit.runner.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.test.context.*;
import org.springframework.boot.test.mock.mockito.*;
import org.springframework.test.context.junit4.*;

import static org.assertj.core.api.Assertions.*;
import static org.mockito.BDDMockito.*;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyTests {

@MockBean
private RemoteService remoteService;

@Autowired
private Reverser reverser;

@Test
public void exampleTest() {
// RemoteService has been injected into the reverser bean
given(this.remoteService.someCall()).willReturn("mock");
String reverse = reverser.reverseSomeCall();
assertThat(reverse).isEqualTo("kcom");
}

}

MockMvc

主要是有了一个注解方法来代替之前的手动创建mockMvc的形式,在这里记录一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.web.servlet.MockMvc;

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest
@AutoConfigureMockMvc
class MockMvcExampleTests {

@Test
void exampleTest(@Autowired MockMvc mvc) throws Exception {
mvc.perform(get("/")).andExpect(status().isOk()).andExpect(content().string("Hello World"));
}

}

还有一个@WebMvcTest注解应该等价于之前的mockmvc的standalone启动模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.junit.*;
import org.junit.runner.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.test.autoconfigure.web.servlet.*;
import org.springframework.boot.test.mock.mockito.*;

import static org.assertj.core.api.Assertions.*;
import static org.mockito.BDDMockito.*;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;

@RunWith(SpringRunner.class)
@WebMvcTest(UserVehicleController.class)
public class MyControllerTests {

@Autowired
private MockMvc mvc;

@MockBean
private UserVehicleService userVehicleService;

@Test
public void testExample() throws Exception {
given(this.userVehicleService.getVehicleDetails("sboot"))
.willReturn(new VehicleDetails("Honda", "Civic"));
this.mvc.perform(get("/sboot/vehicle").accept(MediaType.TEXT_PLAIN))
.andExpect(status().isOk()).andExpect(content().string("Honda Civic"));
}

}

Dubbo Service的Mock

// TODO

扩展

easy-random:

Easy Random is a library that generates random Java beans.

在项目中主要用在单测需要复杂的DTO的时候,如果项目中对现有的复杂DTO没用random工具的话还是非常不错的.

0%