linux rcu
linux rcu
RCU(Read-Copy Update)是一种同步机制,通过保存对象的多个副本来保障读操作的连续性,并保证在预定的读方临界区没有完成之前不会释放这个对象。传统的同步机制如spin lock,semaphore,rwlock等,并发线程不区分读写线程,或者并发线程允许同时读,但是读的时候不允许更新。RCU与这些机制最大的区别是允许在更新的同时读数据。RCU允许同时有一个更新线程和多个读线程并发;
RCU是如何做到上述的那种效果呢? RCU把更新操作分解为两个部分removal 和reclaimation;在removal阶段,删除对该数据结构的引用,因为CPU对单个指针的写入操作是原子的,因此删除过程可以与新的读线程并发执行;reclaimation阶段必须等待removal阶段所有的读线程结束后才可以回收该数据结构,对于removal阶段以后的读线程看到的是更新后的数据结构,因此只需要考虑在removal阶段已经存在的那些读线程;
RCU实现过程主要解决以下2个问题:
1. 在一个读线程遍历链表的过程中,另外一个更新线程对链表进行插入操作,RCU需要保证读线程要么能看见新的节点或者看不见新的节点;
2. 读线程读取了某个链表节点,更新线程可以从链表中删除这个节点,但是不能直接回收这个节点,必须等到所有的读线程完成后才进行回收操作;
经典RCU由三个基本机制组成:Publish-Subscribe Mechanism,Waiting for All Pre-existing RCU Readers to Complete,Maintain Multiple Version of Recently Updated Objects;
Publish-Subscribe Mechanism
订阅发布机制就是能够并发插入链表的能力,允许即使链表正被修改,读线程也可以安全的遍历链表;考虑以下例子:
foo *gp == kmalloc((*->a = ->b = ->c = = p;
对于不同的编译器,有可能不能保证最后4条语句的顺序执行。
RCU提供了rcu_assign_pointer用于发布新的数据结构;上面的代码就可以修改为
p->a = ->b = ->c =
rcu_assign_pointer封装了内存屏障,用于保证操作的顺序;
读线程考虑以下代码:
p = (p !=->a, p->b, p->
看上去好像不会有执行顺序问题,但是某些架构的CPU及其编译器可能会在取p的值之前直接取p的成员。编译器会先猜测p的值,然后取p的成员内容,然后才去取p的真实值来检查之前的猜测是否正确;
RCU提供了rcu_dereference用于订阅其他线程发布的值;
rcu_dereference(p) ({ \
(p) _________p1 =
读线程的代码就可以修改为
= (p !=->a, p->b, p->
Waiting for All Pre-existing RCU Readers to Complete:RCU把所有已存在的读线程完成的这段时间称为grace period,如下图所示:
notifier_block __cpuinitdata rcu_nb ==
为了支持热拔插的CPU,注册了一个CPU事件的回调,对于已经启动的CPU直接调用rcu_cpu_notify的CPU_UP_PREPARE事件 rcu_online_cpu中对每个CPU的rcu_data进行了初始化 这里顺便插入per_cpu的实现分析: 上述代码在kernel初始化过程中调用,首先分配一段内存,然后把.data..percpu段中的数据为每个CPU都拷贝一份数据,并把每个CPU引用自己的那一段副本的地址偏移记录下来; RELOC_HIDE(ptr, off) \= (unsigned (ptr)) (__ptr + per_cpu(var, cpu) (*({ \
simple_identifier_##(&per_cpu__##, __per_cpu_offset[cpu]); }))
回到rcu_init_percpu_data中, 其中第二个参数是全局rcu控制块,结构如下: rcu_data结构如下: DECLARE_PER_CPU(struct rcu_data, rcu_data);
一次线程调度表明该CPU已经经历了一次quiescent state, 在进程调度schedule中会调用rcu_qsctr_inc把rdp->passed_quiesc置为1。 另外在每次时钟中断都会检查是否有RCU相关工作需要处理 如果rcu_pending返回1,则进入rcu_check_callbacks,检查当前CPU是否已经通过一次quiescent state,并调用rcu_process_callbacks进行处理 最后执行donelist中的reclaimation
__init rcu_init(& *)(&
__cpuinit rcu_cpu_notify( notifier_block * action, * cpu = (
__devinit rcu_online_cpu( rcu_data *rdp = & rcu_data *bh_rdp = &&&&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks,
unsigned __init setup_per_cpu_areas( * nr_possible_cpus == ALIGN(__per_cpu_end - (size <== alloc_bootmem(size *= ptr --+=
rcu_init_percpu_data( cpu, rcu_ctrlblk * rcu_data *, (*->curtail = &rdp->->nxttail = &rdp->->donetail = &rdp->->quiescbatch = rcp->->qs_pending = ->cpu =->blimit =
cur;
completed;
next_pending;
quiescbatch;
passed_quiesc;
qs_pending;
batch;
rcu_head * rcu_head ** qlen;
rcu_head * rcu_head ** rcu_head * rcu_head ** blimit;
DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
inline rcu_qsctr_inc( rcu_data *rdp = &->passed_quiesc =
update_process_times( task_struct *p = cpu =
__rcu_pending( rcu_ctrlblk *rcp, rcu_data *
(rdp->curlist && !rcu_batch_before(rcp->completed, rdp->
(!rdp->curlist && rdp->
(rdp->
(rdp->quiescbatch != rcp->cur || rdp->
rcu_pending( __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||&rcu_bh_ctrlblk, &
rcu_check_callbacks( cpu,
(user ||&& !in_softirq() &&<= ( << (!&
__rcu_process_callbacks( rcu_ctrlblk * rcu_data *
(rdp->curlist && !rcu_batch_before(rcp->completed, rdp->
*rdp->donetail = rdp->->donetail = rdp->->curlist =->curtail = &rdp->
(rdp->nxtlist && !rdp->->curlist = rdp->->curtail = rdp->->nxtlist =->nxttail = &rdp->
->batch = rcp->cur +
(!rcp->&rcp->->next_pending = &rcp-> (rdp->
rcu_start_batch( rcu_ctrlblk *
(rcp->next_pending &&->completed == rcp->->next_pending = ->cur++->->signaled =
rcu_check_quiescent_state( rcu_ctrlblk * rcu_data *
(rdp->quiescbatch != rcp->->qs_pending = ->passed_quiesc = ->quiescbatch = rcp->
(!rdp->
(!rdp->->qs_pending = &rcp->
(likely(rdp->quiescbatch == rcp->->&rcp->
rcu_do_batch( rcu_data * rcu_head *next, * count = = rdp->= list->->=
(++count >= rdp->->donelist =->qlen -= (rdp->blimit == INT_MAX && rdp->qlen <=->blimit = (!rdp->->donetail = &rdp->&per_cpu(rcu_tasklet, rdp->
评论暂时关闭