linux rcu


RCU(Read-Copy Update)是一种同步机制,通过保存对象的多个副本来保障读操作的连续性,并保证在预定的读方临界区没有完成之前不会释放这个对象。传统的同步机制如spin lock,semaphore,rwlock等,并发线程不区分读写线程,或者并发线程允许同时读,但是读的时候不允许更新。RCU与这些机制最大的区别是允许在更新的同时读数据。RCU允许同时有一个更新线程和多个读线程并发;

RCU是如何做到上述的那种效果呢? RCU把更新操作分解为两个部分removal 和reclaimation;在removal阶段,删除对该数据结构的引用,因为CPU对单个指针的写入操作是原子的,因此删除过程可以与新的读线程并发执行;reclaimation阶段必须等待removal阶段所有的读线程结束后才可以回收该数据结构,对于removal阶段以后的读线程看到的是更新后的数据结构,因此只需要考虑在removal阶段已经存在的那些读线程;

 

RCU实现过程主要解决以下2个问题:

1. 在一个读线程遍历链表的过程中,另外一个更新线程对链表进行插入操作,RCU需要保证读线程要么能看见新的节点或者看不见新的节点;

2. 读线程读取了某个链表节点,更新线程可以从链表中删除这个节点,但是不能直接回收这个节点,必须等到所有的读线程完成后才进行回收操作;

 

经典RCU由三个基本机制组成:Publish-­Subscribe Mechanism,Waiting for All Pre-­existing RCU Readers to Complete,Maintain Multiple Version of Recently Updated Objects;

 

Publish-­Subscribe Mechanism

订阅发布机制就是能够并发插入链表的能力,允许即使链表正被修改,读线程也可以安全的遍历链表;考虑以下例子:

 

 foo *gp == kmalloc((*->a = ->b = ->c = = p;

对于不同的编译器,有可能不能保证最后4条语句的顺序执行。

RCU提供了rcu_assign_pointer用于发布新的数据结构;上面的代码就可以修改为

p->a = ->b = ->c = 

rcu_assign_pointer封装了内存屏障,用于保证操作的顺序;

 

读线程考虑以下代码:

p = (p !=-­>a, p-­>b, p-­>

看上去好像不会有执行顺序问题,但是某些架构的CPU及其编译器可能会在取p的值之前直接取p的成员。编译器会先猜测p的值,然后取p的成员内容,然后才去取p的真实值来检查之前的猜测是否正确;

RCU提供了rcu_dereference用于订阅其他线程发布的值;

 



 rcu_dereference(p)     ({ \
                (p) _________p1 =

读线程的代码就可以修改为

= (p !=->a, p->b, p->

 

Waiting for All Pre-­existing RCU Readers to Complete:RCU把所有已存在的读线程完成的这段时间称为grace period,如下图所示:

notifier_block __cpuinitdata rcu_nb ==

 


 __init rcu_init(& *)(&

为了支持热拔插的CPU,注册了一个CPU事件的回调,对于已经启动的CPU直接调用rcu_cpu_notify的CPU_UP_PREPARE事件

 

  __cpuinit rcu_cpu_notify( notifier_block * action,  * cpu = (

rcu_online_cpu中对每个CPU的rcu_data进行了初始化

  __devinit rcu_online_cpu( rcu_data *rdp = & rcu_data *bh_rdp = &&&&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 

 

这里顺便插入per_cpu的实现分析:

unsigned   __init setup_per_cpu_areas( * nr_possible_cpus == ALIGN(__per_cpu_end - (size <== alloc_bootmem(size *= ptr --+=

上述代码在kernel初始化过程中调用,首先分配一段内存,然后把.data..percpu段中的数据为每个CPU都拷贝一份数据,并把每个CPU引用自己的那一段副本的地址偏移记录下来;

 

RELOC_HIDE(ptr, off) \= (unsigned (ptr)) (__ptr + per_cpu(var, cpu) (*({ \ simple_identifier_##(&per_cpu__##, __per_cpu_offset[cpu]); }))

 

回到rcu_init_percpu_data中,

  rcu_init_percpu_data( cpu,  rcu_ctrlblk * rcu_data *, (*->curtail = &rdp->->nxttail = &rdp->->donetail = &rdp->->quiescbatch = rcp->->qs_pending = ->cpu =->blimit =

其中第二个参数是全局rcu控制块,结构如下:


    cur;        
        completed;    
        next_pending;    

    
                             

rcu_data结构如下:



            quiescbatch;     
            passed_quiesc;     
            qs_pending;     

    
                     batch;     
     rcu_head * rcu_head **            qlen;           
     rcu_head * rcu_head ** rcu_head * rcu_head **        blimit;         
    

DECLARE_PER_CPU(struct rcu_data, rcu_data);
DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);

 

一次线程调度表明该CPU已经经历了一次quiescent state, 在进程调度schedule中会调用rcu_qsctr_inc把rdp->passed_quiesc置为1。


 inline  rcu_qsctr_inc( rcu_data *rdp = &->passed_quiesc = 

另外在每次时钟中断都会检查是否有RCU相关工作需要处理


 update_process_times( task_struct *p = cpu =
    
  __rcu_pending( rcu_ctrlblk *rcp,  rcu_data *
    
     (rdp->curlist && !rcu_batch_before(rcp->completed, rdp-> 
    
     (!rdp->curlist && rdp-> 
    
     (rdp-> 
    
     (rdp->quiescbatch != rcp->cur || rdp-> 
     
 rcu_pending( __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||&rcu_bh_ctrlblk, &

如果rcu_pending返回1,则进入rcu_check_callbacks,检查当前CPU是否已经通过一次quiescent state,并调用rcu_process_callbacks进行处理

 rcu_check_callbacks( cpu, 
     (user ||&& !in_softirq() &&<= ( <<  (!&

  __rcu_process_callbacks( rcu_ctrlblk * rcu_data *
     (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->
        *rdp->donetail = rdp->->donetail = rdp->->curlist =->curtail = &rdp->
     (rdp->nxtlist && !rdp->->curlist = rdp->->curtail = rdp->->nxtlist =->nxttail = &rdp->

        
        ->batch = rcp->cur + 
         (!rcp->&rcp->->next_pending = &rcp-> (rdp->

 



  rcu_start_batch( rcu_ctrlblk *
     (rcp->next_pending &&->completed == rcp->->next_pending = ->cur++->->signaled = 

 


  rcu_check_quiescent_state( rcu_ctrlblk * rcu_data *
     (rdp->quiescbatch != rcp->->qs_pending = ->passed_quiesc = ->quiescbatch = rcp->
    
     (!rdp->
    
     (!rdp->->qs_pending = &rcp->
    
     (likely(rdp->quiescbatch == rcp->->&rcp->

最后执行donelist中的reclaimation


  rcu_do_batch( rcu_data * rcu_head *next, * count = = rdp->= list->->=
         (++count >= rdp->->donelist =->qlen -= (rdp->blimit == INT_MAX && rdp->qlen <=->blimit = (!rdp->->donetail = &rdp->&per_cpu(rcu_tasklet, rdp->

相关内容