rcu - rcu latency optimization
overflow
rcu callback 往往用来释放内存, 如果rcu callback调用的延迟比较高, 就会造成较高的 内存占用. 所以减少rcu callback的调用延迟也是一个很重要的优化方向。
ORG PATCH
在最初的版本中, rcu_data中存放了两个链表
rcu_data->nxtlist: 调用call_rcu()向该链表存放数据rcu_data->curlist: 当rcu_check_callbacks()检测到nxtlist有成员但是curlist没有成员时,会将nxtlist中的链表成员移动到cur_list中, 并且发起一个 新的宽限期用来处理curlist中的数据。
两个链表均为 list_head数据结构,详细请看:
list_head为双向链表,大小为2 * sizeof(struct list_head *), 大佬们感觉双向链表 有点浪费。于是将其修改为单链表。
MODIFY list_head to single link list
修改方法也很简单, rcu_head修改list_head 为struct rcu_head *
1
2
3
4
5
6
struct rcu_head {
- struct list_head list;
+ struct rcu_head *next;
void (*func)(void *obj);
void *arg;
};
rcu_data 中的链表头修改如下 :
1
2
3
4
5
6
7
8
9
10
@@ -99,8 +97,9 @@ struct rcu_data {
/* 2) batch handling */
long batch; /* Batch # for current RCU batch */
- struct list_head nxtlist;
- struct list_head curlist;
+ struct rcu_head *nxtlist;
+ struct rcu_head **nxttail;
+ struct rcu_head *curlist;
};
rcu_data中的nxtlist修改为nxtlist,nxttail, 为什么要增加nxttail呢, 因为调用call_rcu()添加 成员的时候,要访问tail. 这是单链表的常规操作。
修改后,数据结构图如下:
reduce the latency caused by RCU tasklets
但是随后大佬又发现, 如果call_rcu()调用比较频繁, 会造成rcu tasklet执行时间比 较长,从而造成其他流程较大延迟。于是大佬想给每次调用rcu tasklet设置一个最大处 理 callback的个数限制, 从而变相限制rcu tasklet的单次处理时长。
首先我们自己思考下, 如何实现这一目的:
- 既然要限制
rcu tasklet的处理个数, 也就意味着一次调用中,curlist会处理不完. 这时需要另一个链表来记录未处理完的curlist. - 另外, 为了减少延迟,我们需要及时清空
curlist. - 我们需要在因为处理个数限制而被动退出处理流程时,主动再次唤醒
rcu tasklet
OK, 我们来看下大佬实现:
donelist patch 细节展开
rcu_data 中增加额外链表,来记录未处理的rcu callback.
1
2
3
4
5
6
7
@@ -99,6 +99,8 @@ struct rcu_data {
struct rcu_head **nxttail;
struct rcu_head *curlist;
struct rcu_head **curtail;
+ struct rcu_head *donelist;
+ struct rcu_head **donetail;
};
curtail也是在该系列patch中加入的,因为donetail 要访问curtail,下面会说
当我们检测到RCU_curlist() 所处的宽限期已经完成时, 则将curlist中的成员移动到 donelist:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@@ -261,11 +271,11 @@ void rcu_restart_cpu(int cpu)
static void rcu_process_callbacks(unsigned long unused)
{
int cpu = smp_processor_id();
- struct rcu_head *rcu_list = NULL;
if (RCU_curlist(cpu) &&
!rcu_batch_before(rcu_ctrlblk.completed, RCU_batch(cpu))) {
- rcu_list = RCU_curlist(cpu);
+ *RCU_donetail(cpu) = RCU_curlist(cpu);
+ RCU_donetail(cpu) = RCU_curtail(cpu);
RCU_curlist(cpu) = NULL;
RCU_curtail(cpu) = &RCU_curlist(cpu);
}
并且在rcu_process_callbacks()函数中,只要donelist中有成员,都会调用 rcu_do_batch(cpu)处理下:
1
2
3
4
5
6
7
8
9
@@ -300,8 +310,8 @@ static void rcu_process_callbacks(unsigned long unused)
local_irq_enable();
}
rcu_check_quiescent_state();
- if (rcu_list)
- rcu_do_batch(rcu_list);
+ if (RCU_donelist(cpu))
+ rcu_do_batch(cpu);
}
另外, rcu_pending()也修改了逻辑, RCU_donelist()为真说明上次处理中,因为处理 数量限制而被动退出,所以也认为有rcu相关事务需要处理:
1
2
3
4
5
6
7
8
9
static inline int rcu_pending(int cpu)
{
@@ -127,6 +131,9 @@ static inline int rcu_pending(int cpu)
if (!RCU_curlist(cpu) && RCU_nxtlist(cpu))
return 1;
+ if (RCU_donelist(cpu))
+ return 1;
+
最主要的,在rcu_do_batch()中增加处理数量限制的逻辑,如果达到限制,强行break 循环,并且唤醒RCU_tasklet等待下个rcu tasklet再处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-static void rcu_do_batch(struct rcu_head *list)
+static void rcu_do_batch(int cpu)
{
- struct rcu_head *next;
+ struct rcu_head *next, *list;
+ int count = 0;
+ list = RCU_donelist(cpu);
while (list) {
- next = list->next;
+ next = RCU_donelist(cpu) = list->next;
list->func(list);
list = next;
+ if (++count >= maxbatch)
+ break;
}
+ if (!RCU_donelist(cpu))
+ RCU_donetail(cpu) = &RCU_donelist(cpu);
+ else
+ tasklet_schedule(&RCU_tasklet(cpu));
}
相关图示如下:
donelist 图示
初始状态, rcu_head 1, 2, 3 目前位于curlist,等待当前宽限期结束,而nxtlist 中新增了4,5,6.
宽限期结束,将curlist中的成员移动到donelist等待释放,但是由于设置的 maxbatch=1, 每次最多释放一个,所以最终释放rcu_head(1)
进入下个rcu tasklet时, 发现nxtlist中有成员,但是curlist中没有, 所以move nxtlist to curlist, 并发起下个宽限期,同时由于donelist中有成员, maxbatch=1, 释放rcu_head(2)
新的宽限期又结束了, 将curlist(rcu_head(4)(5)(6))串联到donelist上,并在4个 rcu tasklet周期中一次将rcu_head(3),(4),(5)(6)释放.
目前链表有三条了, 这三条链表还是相对独立的,我们再回忆下这三条链表的作用:
- curlist: 存放等待当前宽限期结束的object
- nxtlist: 调用
call_rcu()添加到该list, 还未等待宽限期. - donelist: 宽限期已经结束,等待释放
record rcu_head quiescent version earlier(本文重点)
回忆下,从调用call_rcu()到 object 释放需要经历哪些步骤:
call_rcu()链入nxtlist;- 该cpu进入
rcu tasklet, 此时curlist可能有成员,需要等待当前宽限期结束;释 放curlist成员; - 宽限期结束,释放
curlist, 同时, 将nxtlistmovetocurlist, 此时该object进入curlist, 记录curlist所需要等待的宽限期rcu_ctrlblk.cur + 1 - 并等待该宽限期结束。
从这个流程中可以看出, 从调用call_rcu() 到为该rcu 分配宽限期中间隔了一段时间。 并且, 往往会遇到下面的情况,造成callback将会在当前宽限期rcp->cur的下两个宽 限期才释放:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CPU CUR Quiescent
rcp->cur
call_rcu
add rcu_head to nxtlist
and wait rcp->cur Quiescent
over
rcp->cur quiescent over rcp->cur++
move curlist to donelist
move nxtlist to curlist
set RCU_batch(cpu) =
(rcp->cur++) + 1
wait rcp->cur++ quiescent over
wait ((rcp->cur++) + 1) quiescent over
可以看到等待了几个宽限期呢?
大约是2.5个宽限期(等待rcp->cur 算半个). 但实际上, 该rcu_head 只需要等待 rcp->cur++宽限期结束就可以释放,那也就是1.5个宽限期左右。
overflow 中提到过: 如果rcu callback调用延迟比较高,会造成内存处于较高的占用. 所以尽快释放rcu_head对于优化内存占用十分重要。
那怎么优化这段逻辑呢? 比较方便的做法是在rcu_head中增加字段,记录其所需要 等待宽限期的版本. 但是这样做有点浪费内存, 而且没有必要:
我们并不需要关心rcu_head 具体的rcu宽线期版本,只需要关心其可以在哪个宽限期 释放, 并且最早释放的rcu_head肯定是最早添加的(fifo)
如下图所示:
我们可以结合之前curlist,nxtlist, donelist来看:
- donelist: 最早添加的,其等待释放的宽限期为
< RCP->rcu - nxtlist:
call_rcu()次早添加的,其等待释放的宽限期为= RCP->rcu - curlist: 最新添加的,其等待释放的宽限期 为
RCP->rcu + 1
所以在记录是,我们也可以分为三个部分记录。
而Lai Jiangshan大佬想了一个方法。添加的时候仍向nxtlist链表添加, 不过。当随着宽限期inc,将某些object “降级”:
例如:
- nxtlist->curlist
- curlist->donelist
如下图所示:
在每一次宽限期结束时,需要做上面的降级动作.
我们来看下具体实现:
具体patch
数据结构改动:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct rcu_data {
/* 1) quiescent state handling : */
long quiescbatch; /* Batch # for grace period */
@@ -78,12 +74,24 @@ struct rcu_data {
int qs_pending; /* core waits for quiesc state */
/* 2) batch handling */
- long batch; /* Batch # for current RCU batch */
+ /*
+ * if nxtlist is not NULL, then:
+ * batch:
+ * The batch # for the last entry of nxtlist
+ * [*nxttail[1], NULL = *nxttail[2]):
+ * Entries that batch # <= batch
+ * [*nxttail[0], *nxttail[1]):
+ * Entries that batch # <= batch - 1
+ * [nxtlist, *nxttail[0]):
+ * Entries that batch # <= batch - 2
+ * The grace period for these entries has completed, and
+ * the other grace-period-completed entries may be moved
+ * here temporarily in rcu_process_callbacks().
+ */
+ long batch;
struct rcu_head *nxtlist;
- struct rcu_head **nxttail;
+ struct rcu_head **nxttail[3];
首先更改batch的逻辑: 原来batch表示当前curlist所需等待结束的宽限期版本。
而现在batch表示当前nxtlist 所有rcu_head所需等待的最大的宽限期版本. 也就是最 新加入nxtlist rcu_head object所等待的宽限期。
关于list部分,不再需要curlist, 而是将nxtlist拆成了三个数组:
- [nxtlist, *nxttail[0]) : 这些
rcu_head所需等待的宽限期<=batch - 2 - [*nxttail[0], *nxttail[1]): 这些
rcu_head所需等待的宽限期<=batch - 1 - [*nxttail[1], NULL = *nxttail[2]): 这些
rcu_head所需要等待的宽限期<=batch
而在call_rcu() 和__rcu_process_callbacks()都会观测宽限期是否更改,如果更改则 需要move链表:
首先看 call_rcu()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
long batch;
smp_mb(); /* reads the most recently updated value of rcu->cur. */
/*
* Determine the batch number of this callback.
*
* Using ACCESS_ONCE to avoid the following error when gcc eliminates
* local variable "batch" and emits codes like this:
* 1) rdp->batch = rcp->cur + 1 # gets old value
* ......
* 2)rcu_batch_after(rcp->cur + 1, rdp->batch) # gets new value
* then [*nxttail[0], *nxttail[1]) may contain callbacks
* that batch# = rdp->batch, see the comment of struct rcu_data.
*/
//获取最后一个元素(新加入)的成员所等待的宽限期
batch = ACCESS_ONCE(rcp->cur) + 1;
//这个说明链表中的最后一个元素等待宽限期小于`<= batch 1`, 需要将其移动到
//[*nxttail[0], *nxttail[1]),
//同样的也需要将上面列表移动 [*nxtlist, *nxtlist[0]]
if (rdp->nxtlist && rcu_batch_after(batch, rdp->batch)) {
/* process callbacks */
rdp->nxttail[0] = rdp->nxttail[1];
rdp->nxttail[1] = rdp->nxttail[2];
//说明已经过期了两个宽限期,所以可以移动两次。
if (rcu_batch_after(batch - 1, rdp->batch))
//索性直接将nxttail[0], 指向nxttail[2]
rdp->nxttail[0] = rdp->nxttail[2];
}
//赋值rdp->batch
rdp->batch = batch;
//追加到nxttail
*rdp->nxttail[2] = head;
rdp->nxttail[2] = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_ctrlblk);
}
}
图示如下:
下图标记了nxttail[]指向的位置, 以及执行*nxttail[]所取得的值.
下图展示了调用call_rcu()添加rcu_head所带来的nxttail[2]的改动
下图展示了,在调用call_rcu()添加rcu_head所检测到宽限期前进, 而移动链表的流程:
再看下__rcu_process_callbacks()流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
- if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
- *rdp->donetail = rdp->curlist;
- rdp->donetail = rdp->curtail;
- rdp->curlist = NULL;
- rdp->curtail = &rdp->curlist;
- }
-
- if (rdp->nxtlist && !rdp->curlist) {
+ if (rdp->nxtlist) {
local_irq_disable();
- rdp->curlist = rdp->nxtlist;
- rdp->curtail = rdp->nxttail;
- rdp->nxtlist = NULL;
- rdp->nxttail = &rdp->nxtlist;
- local_irq_enable();
/*
- * start the next batch of callbacks
+ * move the other grace-period-completed entries to
+ * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
+ */
//查看rdp->batch 是否已经完成,如果完成了,说明整个链表中的
//成员都完成了, 这种情况下nxttail[0], nxttail[1] 均指向末尾
+ if (!rcu_batch_before(rcp->completed, rdp->batch))
+ rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
//这种情况下说明`[*nxttail[0], *nxttail[1]) 成员已经完成, 只更改
//nxttail[0]
+ else if (!rcu_batch_before(rcp->completed, rdp->batch - 1))
+ rdp->nxttail[0] = rdp->nxttail[1];
+
+ /*
+ * the grace period for entries in
+ * [rdp->nxtlist, *rdp->nxttail[0]) has completed and
+ * move these entries to donelist
*/
//说明 `[curlist, *nxtlist[0])` 不为空,需要将其移动到donelist中
+ if (rdp->nxttail[0] != &rdp->nxtlist) {
//将donelist链接上nxtlist
+ *rdp->donetail = rdp->nxtlist;
//重新设置donetail
+ rdp->donetail = rdp->nxttail[0];
//nxtlist赋值为 *rdp->nxttail[0]
+ rdp->nxtlist = *rdp->nxttail[0];
//将其于原来的curlist断开
+ *rdp->donetail = NULL;
+
//我们需要重新赋值这些tail (如果这些tail 等于rdp->
//nxttail[0], 另外nxttail[0] 肯定重新赋值因为其被清
//空了)
+ if (rdp->nxttail[1] == rdp->nxttail[0])
+ rdp->nxttail[1] = &rdp->nxtlist;
+ if (rdp->nxttail[2] == rdp->nxttail[0])
+ rdp->nxttail[2] = &rdp->nxtlist;
+ rdp->nxttail[0] = &rdp->nxtlist;
+ }
- /* determine batch number */
//这里不再需要赋值rdp->batch, 因为rdp->batch 含义变了,需要在
//call_rcu()赋值(在call_rcu()中取rcp->cur+1)
- rdp->batch = rcp->cur + 1;
+ local_irq_enable();
if (rcu_batch_after(rdp->batch, rcp->pending)) {
/* and start it/schedule start if it's a new batch */
该函数判断, rdp->batch 或者rdp->batch - 1 宽限期是否complete,如果
rdp->batchcomplete: 链表中所有成员都要移动到donelist中, 此时三个链表都是空rdp->batch - 1complete: 只将[*nxtlist[0], *nxtlist[1])移动到[curlist, *nxtlist[0]), 移动后,nxtlist[0] = nxtlist[1]该链表为空
图示如下:
参考链接
- reduce rcu_head size - core
- b659a6fbb927a79acd606c4466d03cb615879f9f
- Dipankar Sarma dipankar@in.ibm.com
- Wed Jun 23 18:50:06 2004 -0700
- RCU: low latency rcu
- daf86b08a178f950c0e0ec073c25cc392dbbc789
- Dipankar Sarma dipankar@in.ibm.com
- Sun Aug 22 22:57:42 2004 -0700
- rcu classic: new algorithm for callbacks-processing(v2)
- 5127bed588a2f8f3a1f732de2a8a190b7df5dce3
- Lai Jiangshan laijs@cn.fujitsu.com
- Sun Jul 6 17:23:59 2008 +0800