leancloud_counter_security插件生成的leancoud.memo问题

之前在配置了leancloud_counter_security插件之后就没有在意了,但是最近注意到没有收到ci的构建邮件了就去重新看了一下ci配置,果然还是问题重重.

Travis CI 整合 leancloud_counter_security

因为一开始做ci的整合的时候对ci脚本并没有很深的理解,所以也就是拿来就用,在熟悉了ci后的现在再来审视脚本就感觉有很多可以调整的地方.

首先是完全忘记了安装插件 … 因为之前集成的时候只是在本地测试了一下,并没有注意到ci脚本需要修改而遗留的问题

npm install hexo-leancloud-counter-security

leancloud.memo

这个文件主要是因为在blog过多之后如果继续使用白嫖的leancoud服务的话就容易报Too Many Request的问题而做的改进,在每次deploy的时候会对数据做一个本地备份,而在ci脚本中,之前完全没有意识到这个问题,从而导致了每次在ci服务器上生成的.memo文件生成完就被丢弃了,可以说完全没有起作用,所以主要也是要解决leancloud.memo的持久化问题.

思路

因为在配置Travis CI的时候已经授权过一个access token了,所以可以直接借用这个token来进行操作.

直接使用https的形式来进行文件的push:

git push -u https://${Travis_CI}@github.com/TangMisaka23001/TangMisaka23001.github.io.git source

只需要每次在deploy之后往源文件的仓库把memo文件的更新push进去就可以了.

添加的脚本如下:

1
2
3
4
5
6
7
8
# leancloud统计相关
# checkout命令比较玄学,个人并不是很理解因为clone下来的应该就是source分支的代码
# 当时也被这个坑了很久,猜测可能是在deploy的时候影响了仓库的分支吧
- git checkout source
- git add source/leancloud.memo
# [skip ci] 用于跳过因为这次commit而产生的ci构建防止构建套娃 (因为现在只检测 source分支有变动就会进行一次构建)
- git commit -m "update leancloud.memo [skip ci]"
- git push -u https://${Travis_CI}@github.com/TangMisaka23001/TangMisaka23001.github.io.git source

所以说持续保持ci脚本的正确性还是很重要的 !!!

现在ci脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
language: node_js   #设置语言
node_js: stable #设置相应的版本
notifications: #开启邮件通知
email:
recipients:
- mikasatang@gmail.com
on_success: always
on_failure: always
cache:
directories:
- node_modules #据说可以减少travis构建时间
before_install:
- npm install -g hexo-cli
install:
- npm install #安装hexo及插件
- npm install hexo-deployer-git --save
- npm install hexo-git-backup --save
- npm install hexo-leancloud-counter-security
script:
- hexo clean
- hexo g #生成
after_script:
# 替换同目录下的_config.yml文件中gh_token字符串为travis后台刚才配置的变量,注意此处sed命令用了双引号。单引号无效!
- sed -i "s/gh_token/${Travis_CI}/g" ./_config.yml
# 部署博客相关命令
- echo "misakatang.cn" > ./public/CNAME
- cp LICENSE ./public
- cp README.md ./public
- git config --global user.name "misakatang"
- git config --global user.email "mikasatang@gmail.com"
- hexo deploy
# leancloud统计相关
- git checkout source
- git add source/leancloud.memo
- git commit -m "update leancloud.memo [skip ci]"
- git push -u https://${Travis_CI}@github.com/TangMisaka23001/TangMisaka23001.github.io.git source
branches:
only:
- source #只监测这个分支,一有动静就开始构建

Redlock.xmind

参考文章

Distributed locks with Redis

分布式锁需求

  1. 互斥
  2. 过期锁的释放
  3. 容错能力

算法设计(主要是获取锁)

  1. 获取当前服务时间
  2. 尝试从全部N个实例获取锁
  3. 每个实例计算锁的有效时间为: 需要加锁时间-获取锁消耗时间
  4. 只有大部分实例成功获取锁这次加锁才能成功
  5. 如果加锁成功,锁的有效时间为所有实例有效时间中最短时间
  6. 加锁失败就请求所有实例释放该锁(无论是否加锁)

Java代码实现

Redisson的RedLock实现–加锁部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}

long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);

int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍历所有的实例
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 尝试加锁
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 加锁失败全部释放
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}

// 获取锁成功加入acquiredLocks List
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}

if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}

if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
// 遍历循环结束

if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
// 循环全部成功获取锁的实例异步加锁
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}

return true;
}

前言

这篇blog主要是在看了Doug Lea的fork/join实现的paper之后的一个总结.

文章主要参考了:

  1. A Java Fork/Join Framework(PDF)
  2. Java Fork/Join框架–翻译
  3. Overview of package util.concurrent Release 1.3.4.(Doug Lea的fork/join代码实现)

整个paper笔记的思维导图就是上面的图片,这里也留一下xmind文件链接: fork-join.xmind

代码

由于paper以及翻译对实现的思路讲解已经很详细了,这里就不再赘述,主要还是结合代码实现再看一下整个框架.

就像论文中提到的,Java部分的实现主要为下面三个类:

  1. FJTask(implement Runnable)
  2. FJTaskRunner(extends Thread)
  3. FJTaskRunnerGroup

下面就分别看一下三个类的代码实现.

FJTask

fork/join代码都非常的简单,因为这只是一个实现了Runnable接口的轻量线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 下面贴出来的是论文中讲到的FJTask有的方法,其余的一些辅助的方法也就不加进来了

private volatile boolean done; // = false;

public final boolean isDone() { return done; }

public static FJTaskRunner getFJTaskRunner() {
return (FJTaskRunner)(Thread.currentThread());
}

public void fork() { getFJTaskRunner().push(this); }

public void join() { getFJTaskRunner().taskJoin(this); }

public static void coInvoke(FJTask task1, FJTask task2) {
getFJTaskRunner().coInvoke(task1, task2);
}

FJTaskRunner

这个类是整个fork/join框架的核心,代码也比较多

push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Push a task onto DEQ.
* Called ONLY by current thread.
**/

protected final void push(final FJTask r) {
int t = top;

/*
This test catches both overflows and index wraps. It doesn't
really matter if base value is in the midst of changing in take.
As long as deq length is < 2^30, we are guaranteed to catch wrap in
time since base can only be incremented at most length times
between pushes (or puts).
*/
// 这里是重点,注释中也已经讲的很清楚了
if (t < (base & (deq.length-1)) + deq.length) {

deq[t & (deq.length-1)].put(r);
top = t + 1;
}

else // isolate slow case to increase chances push is inlined
slowPush(r); // check overflow and retry
}

// slowPush主要是为了应对数组的resize
/**
* Handle slow case for push
**/

protected synchronized void slowPush(final FJTask r) {
checkOverflow();
push(r); // just recurse -- this one is sure to succeed.
}

pop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Return a popped task, or null if DEQ is empty.
* Called ONLY by current thread.
* <p>
* This is not usually called directly but is
* instead inlined in callers. This version differs from the
* cilk algorithm in that pop does not fully back down and
* retry in the case of potential conflict with take. It simply
* rechecks under synch lock. This gives a preference
* for threads to run their own tasks, which seems to
* reduce flailing a bit when there are few tasks to run.
**/

protected final FJTask pop() {
/*
Decrement top, to force a contending take to back down.
*/

int t = --top;

/*
To avoid problems with JVMs that do not properly implement
read-after-write of a pair of volatiles, we conservatively
grab without lock only if the DEQ appears to have at least two
elements, thus guaranteeing that both a pop and take will succeed,
even if the pre-increment in take is not seen by current thread.
Otherwise we recheck under synch.
*/

if (base + 1 < t)
return deq[t & (deq.length-1)].take();
else
// 这就是论文中提到的pop失败之后会重试,直到队列真的为空
return confirmPop(t);

}

/**
* Check under synch lock if DEQ is really empty when doing pop.
* Return task if not empty, else null.
**/

protected final synchronized FJTask confirmPop(int provisionalTop) {
if (base <= provisionalTop)
return deq[provisionalTop & (deq.length-1)].take();
else { // was empty
/*
Reset DEQ indices to zero whenever it is empty.
This both avoids unnecessary calls to checkOverflow
in push, and helps keep the DEQ from accumulating garbage
*/

top = base = 0;
return null;
}
}

take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/** 
* Take a task from the base of the DEQ.
* Always called by other threads via scan()
**/


protected final synchronized FJTask take() {

/*
Increment base in order to suppress a contending pop
*/

int b = base++;

if (b < top)
return confirmTake(b);
else {
// back out
// take的机制就是类似于fail fast, 会去尝试窃取其他线程的taks
base = b;
return null;
}
}


/**
* double-check a potential take
**/

protected FJTask confirmTake(int oldBase) {

/*
Use a second (guaranteed uncontended) synch
to serve as a barrier in case JVM does not
properly process read-after-write of 2 volatiles
*/

synchronized(barrier) {
if (oldBase < top) {
/*
We cannot call deq[oldBase].take here because of possible races when
nulling out versus concurrent push operations. Resulting
accumulated garbage is swept out periodically in
checkOverflow, or more typically, just by keeping indices
zero-based when found to be empty in pop, which keeps active
region small and constantly overwritten.
*/

return deq[oldBase & (deq.length-1)].get();
}
else {
base = oldBase;
return null;
}
}
}

剩下的代码就是对dqueue的resize和一些辅助的函数

FJTaskRunnerGroup

主要是对FJTaskRunner的管理的一些辅助函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/** The threads in this group **/
protected final FJTaskRunner[] threads;

/** Group-wide queue for tasks entered via execute() **/
protected final LinkedQueue entryQueue = new LinkedQueue();

/**
* Create a FJTaskRunnerGroup with the indicated number
* of FJTaskRunner threads. Normally, the best size to use is
* the number of CPUs on the system.
* <p>
* The threads in a FJTaskRunnerGroup are created with their
* isDaemon status set, so do not normally need to be
* shut down manually upon program termination.
**/

public FJTaskRunnerGroup(int groupSize) {
threads = new FJTaskRunner[groupSize];
initializeThreads();
initTime = System.currentTimeMillis();
}

/**
* Arrange for execution of the given task
* by placing it in a work queue. If the argument
* is not of type FJTask, it is embedded in a FJTask via
* <code>FJTask.Wrap</code>.
* @exception InterruptedException if current Thread is
* currently interrupted
**/

public void execute(Runnable r) throws InterruptedException {
if (r instanceof FJTask) {
entryQueue.put((FJTask)r);
}
else {
entryQueue.put(new FJTask.Wrap(r));
}
signalNewTask();
}

whats else

首先,Doug Lea的论文真的很牛逼,可以说把整个框架的实现细节都讲的很清楚了,而且在代码中无处不在的各种注释也对于框架的理解非常有用.在源码包里还有对应生产的JavaDoc可以说阅读Java基础代码的整体体验都是非常不错的(集合包中的注释也是非常的完整).

其次,最近开始尝试使用思维导图来整理知识,这算是第一次尝试,总体感觉还是很不错的.

0%