线程之线程同步


前言

当多个控制线程共享相同的内存时,需要确保每个线程看到一致的数据视图。如果每个线程使用的变量都是其他线程不会读取或修改的,那么就不会存在一致性问题。同样地,如果变量是只读的,多个线程同时读取该变量也不会有一致性问题。但是,当某个线程可以修改变量,而其他线程也可以读取或修改这个变量的时候,就需要对这些线程进行同步,以确保它们在访问变量的存储内容时不会访问到无效的数值。

当一个线程修改变量时,其他线程在读取这个变量的值时就可能会看到不一致的数据。在变量修改时间多于一个存储器访问周期的处理器结构中,当存储器读与存储器写这两个周期交叉时,这种潜在的不一致性就会出现。当然,这种行为是与处理器结构相关的,但是可移植性程序并不能对使用何种处理器结果作出假设。

图11-2描述了两个线程读写相同变量的假设例子。在这个例子中,线程A读取变量然后给这个变量赋予一个新的值,但写操作需要两个存储器周期。当线程B在这两个存储器写周期中间读取这个相同的变量时,它就会得到不一致的值。

pthread_mutex_init(pthread_mutex_t * pthread_mutexattr_t * pthread_mutex_destroy(pthread_mutex_t *

要用默认的属性初始化互斥量,只需把attr设置为NULL。

对互斥量进行加锁,需要调用pthread_mutex_lock,如果互斥量已经上锁,调用线程将阻塞直到互斥量被解锁。对互斥量解锁,需要调用pthread_mutex_unlock。

#include <pthread.h>
 pthread_mutex_lock(pthread_mutex_t * pthread_mutex_trylock(pthread_mutex_t * pthread_mutex_unlock(pthread_mutex_t *

如果线程不希望被阻塞,它可以使用pthread_mutex_trylock尝试对互斥量进行加锁。如果调用pthread_mutex_trylock时互斥量处于未锁住状态,那么pthread_mutex_trylock将锁住互斥量,不会出现阻塞并返回0,否则pthread_mutex_trylock就会失败,不能锁住互斥量,而返回EBUSY。

实例

程序清单11-5描述了用于保护某个数据结构的互斥量。当多个线程需要访问动态分配的对象时,可以在对象中嵌入引用计数,确保在所有使用该对象的线程完成数据访问之前,该对象内存空间不会被释放。

程序清单11-5 使用互斥量保护数据结构

#include <stdlib.h><pthread.h>

 foo *)     foo *((fp = malloc(( foo))) !=->f_count = (pthread_mutex_init(&fp->f_clock, NULL) !=  foo *fp) &fp->->f_count++&fp-> foo *fp) &fp->(--fp->f_count == )    &fp->&fp->&fp->

在使用该对象前,线程需要对这个对象的引用计数加1,当对象使用完毕时,需要对引用计数减1。当最后一个引用被释放时,对象所占的内存空间就被释放。在对引用计数加1、减1以及检查引用计数是否为0这些操作之前需要锁住互斥量。

2、避免死锁

如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入死锁状态,使用互斥量时,还有其他更不明显的方式也能产生死锁。例如,程序中使用多个互斥量时,如果允许一个线程一直占有第一个互斥量,并且在试图锁住第二个互斥量时处于阻塞状态,但是拥有第二个互斥量的线程也在试图锁住第一个互斥量,这时就会发生死锁。因为两个线程都在相互请求另一个线程拥有的资源,所以这两个线程都无法向前运行,于是就产生死锁。

可以通过小心地控制互斥量加锁的顺序来避免死锁的发生。例如,假设需要对两个互斥量A和B同时加锁,如果所有线程总是在对互斥量B加锁之前锁住互斥量A,那么使用这两个互斥量不会产生死锁(当然在其他资源上仍可能出现死锁);类似地,如果所有的线程总是在锁住互斥量A之前锁住互斥量B,那么也不会发生死锁。只有在一个线程试图以与另一个线程相反的顺序锁住互斥量时,才可能出现死锁。

有时候应用程序的结果使得对互斥量加锁进行排序是很困难的,如果涉及了太多的锁和数据结构,可用的函数并不能把它转换成简单的层次,那么就需要采用另外的方法。可以先释放占有的锁,然后过一段时间再试。这种情况可以使用pthread_mutex_trylock接口避免死锁。如果已经占有某些锁而且pthread_mutex_trylock接口返回成功,那么就可以前进;但是,如果不能获取锁,可以先释放已经占有的锁,做好清理工作,然后过一段时间重新尝试。

实例

程序清单11-6(修改自程序清单11-5)用以描述两个互斥量的使用方法。当同时使用两个互斥量时,总是让它们以相同的顺序加锁,以避免死锁。第二个互斥量维护着一个用于跟踪foo数据结构的散列列表。这样hashlock互斥量保护foo数据结构中的fh散列表和f_next散列链字段。foo结构中的f_lock互斥量保护对foo结构中的其他字段的访问。

程序清单11-6 使用两个互斥量

#include <stdlib.h><pthread.h>

 NHASH    29
    HASH(fp)    (((unsigned long)fp)%NHASH)
 foo *= foo    *f_next;    
     foo *)     foo *((fp = malloc(( foo))) !=->f_count = (pthread_mutex_init(&fp->f_lock, NULL) != =&->f_next ==&fp->&&fp-> foo *fp)    &fp->->f_count++&fp-> foo * id)     foo    *=&(fp = fh[idx]; fp != NULL; fp = fp->(fp->f_id ==& foo *fp)     foo     *&fp->(fp->f_count == ) &fp->&&fp->
        (fp->f_count != ->f_count--&fp->&==(tfp === fp->(tfp->f_next !== tfp->->f_next = fp->&&fp->&fp->->f_count--&fp->

比较程序清单11-6和程序清单11-5,可以看出分配函数现在锁住散列列表锁,把新的结构添加到散列存储桶中,在对散列列表的锁解锁之前,先锁住新结构中的互斥量。因为新的结构是放在全局列表中的,其他线程可以找到它,所以在完成初始化之前,需要阻塞其他试图访问新结构的线程。

foo_find函数锁住散列列表锁,然后搜索被请求的结构。如果找到了,就增加其引用计数并返回指向该结构的指针。注意加锁的顺序是先在foo_find函数中锁定散列列表锁,然后再在foo_hold函数中锁定foo结构中的f_clock互斥量。

现在有了两个锁以后,foo_rele函数变得更加复杂。如果这是最后一个引用,因为将需要从散列列表中删除这个结构,就要先对这个结构互斥量进行解锁,才可以获取散列列表锁。然后重新获取结构互斥量。从上一次获得结构互斥量以来可能处于被阻塞状态,所以需要重新检查条件,判断是否还需要释放这个结构。如果其他线程在我们为满足锁顺序而阻塞时发现了这个结构并对其引用计数加1,那么只需要简单地对引用计数减1,对所有的东西解锁然后返回。

如此加、解锁太复杂,所以需要重新审视原来的设计。也可以使用散列列表锁来保护引用计数,使事情大大简化,结构互斥量可以用于保护foo结构中的其他任何东西。程序清单11-7反应了这种变化。

程序清单11-7 简化的加、解锁

#include <stdlib.h><pthread.h>

 NHASH 29
 HASH(fp)    (((unsigned long)fp)%NHASH)

 foo *=            f_count;     foo        *f_next;    
     foo *)     foo     *((fp = malloc(( foo))) !=->f_count = (pthread_mutex_init(&fp->f_lock, NULL) != =&->f_next ==&fp->& foo *fp)    &->f_count++& foo * id)     foo     *=&(fp = fh[idx]; fp != NULL; fp = fp->(fp->f_id ==->f_count++& foo *fp)     foo     *&(--fp->f_count == )    ==(tfp === fp->(tfp->f_next !== tfp->->f_next = fp->&&fp->&

注意,与程序清单11-6中的程序相比,程序清单11-7中的程序简单得多。两种用途使用相同的锁时,围绕散列列表和引用计数的锁的排序问题就随之不见了。多线程的软件设计经常要考虑这类折中处理方案。如果锁的粒度太粗,就会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微。如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得相当复杂。作为一个程序员,需要在满足锁需求的情况下,在代码复杂性和优化性能之间找到好平衡点。

->f_next ==http://bbs.csdn.net/topics/330092120

3、读写锁

读写锁与互斥量类似,不过读写锁允许更高的并行性。互斥量要么是锁住状态要么是不加锁状态,而且一次只有一个线程可以对其加锁。读写锁可以有三种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁

当读写锁是写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是如果线程希望以写模式对此锁进行加锁,它必须阻塞直到所有的线程释放读锁。虽然读写锁的实现各不相同,但当读写锁处于读模式锁住状态时,如果有另外的线程试图以写模式加锁,读写锁通常会阻塞随后的读模式锁请求。这样可以避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足

读写锁非常适合于对数据结构读的次数远大于写的情况。当读写锁在写模式下时,它所保护的数据结构就可以被安全地修改,因为当前只有一个线程可以在写模式下拥有这个锁。当读写锁在读模式下时,只要线程获取了读模式下的读写锁,该锁所保护的数据结构可以被多个获得读模式锁的线程读取。

读写锁也叫做共享-独占锁,当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独占模式锁住的。

与互斥量一样,读写锁在使用之前必须初始化,在释放它们底层的内存前必须销毁。

#include <pthread.h>
 pthread_rwlock_init(pthread_rwlock_t * pthread_rwlockattr_t * pthread_rwlock_destroy(pthread_rwlock_t *

读写锁通过调用pthread_rwlock_init进行初始化。如果希望读写锁有默认的属性,可以传一个空指针给attr。

在释放读写锁占用的内存之前,需要调用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init为读写锁分配了资源,pthread_rwlock_destroy将释放这些资源。如果在调用pthread_rwlock_destroy之前就释放了读写锁占用的内存空间,那么分配给这个锁的资源就丢失了。

要在读模式下锁定读写锁,需要调用pthread_rwlock_rdlock要在写模式下锁定读写锁,需要调用pthread_rwlock_wrlock。不管以何种方式锁住读写锁,都可以调用pthread_rwlock_unlock进行解锁

#include <pthread.h>
 pthread_rwlock_rdlock(pthread_rwlock_t * pthread_rwlock_wrlock(pthread_rwlock_t * pthread_rwlock_unlock(pthread_rwlock_t *

在实现读写锁的时候可能会对共享模式下可获取的锁的数量进行限制,所以需要检查pthread_rwlock_rdlock的返回值。即使pthread_rwlock_wrlock和pthread_rwlock_unlock有错误的返回值,如果锁设计合理的话,也不需要检查其返回值。错误返回值的定义只是针对不正确地使用读写锁的情况,例如未经初始化的锁,或者试图获取已拥有的锁从而可能产生死锁这样的错误返回等。

Single UNIX Specification同样定义了有条件的读写锁原语的版本。

#include <pthread.h>
 pthread_rwlock_tryrdlock(pthread_rwlock_t * pthread_rwlock_trywrlock(pthread_rwlock_t *

可以获取锁时,函数返回0,否则,返回错误EBUSY。这些函数可以用于遵循某种锁层次但还不能完全避免死锁的情况。

 实例

程序清单11-8中的程序解释了读写锁的使用。作业请求队列由单个读写锁保护。实现多个工作线程获取由单个主线程分配给它们的作业。

程序清单11-8 使用读写锁

#include <stdlib.><pthread.h>

 job * job *
     job        * job             *
 queue *->q_head =->q_tail == pthread_rwlock_init(&qp->(err != 

    (
 queue *qp,  job *&qp->->j_next = qp->->j_prev =(qp->q_head !=->q_head->j_prev =->q_tail = jp;    ->q_head =&qp->
 queue *qp,  job *&qp->->j_next =->j_prev = qp->(qp->q_tail !=->q_tail->j_next =->q_head = jp; ->q_tail =&qp->
 queue *qp,  job *&qp->(jp == qp->->q_head = jp->(qp->q_tail ==->q_tail =  (jp == qp-->q_tail = jp->(qp->q_head ==->q_head =->j_prev->j_next = jp->->j_next->j_prev = jp->&qp->
 job * queue * job *(pthread_rwlock_rdlock(&qp->q_lock) != (jp = qp->q_head; jp != NULL; jp = jp->(pthread_equal(jp->&qp->

在这个例子中,不管什么时候需要增加一个作业到队列中或者从队列中删除作业,都用写模式锁住队列的读写锁。不管何时搜索队列,首先需要获取读模式下的锁,允许所有的工作线程并发地搜索队列。在这种情况下,只有线程搜索队列的频率远远高于增加或删除作业时,使用读写锁才能改善性能。

工作线程只能从队列中读取与它们的线程ID匹配的作业。既然作业结构同一时间只能由一个线程使用,所以不需要额外加锁。

4、条件变量

条件变量是线程可用的另一种同步机制(http://www.cnblogs.com/feisky/archive/2010/03/08/1680950.html)。条件变量给多个线程提供了一个会合的场所。条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。(条件变量基本概念和原理:http://hipercomer.blog.51cto.com/4415661/914841)

(线程同步:条件变量的使用细节分析:http://blog.chinaunix.net/uid-28852942-id-3757186.html)

条件本身是由互斥量保护的线程在改变条件状态前必须首先锁住互斥量,其他线程在获得互斥量之前不会察觉到这种改变,因为必须锁定互斥量以后才能计算条件

条件变量使用之前必须首先进行初始化pthread_cond_t数据类型代表的条件变量可以用两种方式进行初始化,可以把常量PTHREAD_COND_INITIALIZER赋给静态分配的条件变量,但是如果条件变量是动态分配的,可以使用pthread_cond_init函数进行初始化。

在释放底层的内存空间之前,可以使用pthread_cond_destroy函数对条件变量进行去除初始化(deinitialize)。

#include <pthread.h>

 pthread_cond_init( pthread_cond_t ** pthread_cond_destroy( pthread_cond_t *

除非需要创建一个非默认属性的条件变量,否则pthread_cond_init函数的attr参数可以设置为NULL。

使用pthread_cond_wait等待条件变为真,如果在给定的时间内条件不能满足,那么会生成一个代码出错码的返回值。

#include <pthread.h>

 pthread_cond_wait( pthread_cond_t ** pthread_cond_timedwait( pthread_cond_t **  timespec *

传递给pthread_cond_wait的互斥量对条件进行保护,调用者把锁住的互斥量传给函数函数把调用线程放到等待条件的线程列表上,然后对互斥量解锁,这两个操作是原子操作。这样就关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间通道,这样线程就不会错过条件的任何变化。pthread_cond_wait返回时,互斥量再次被锁住。

pthread_cond_timedwait函数的工作方式与pthread_cond_wait函数相似,只是多了一个timeout。timeout值指定了等待的时间,它是通过timespec结构指定。时间值用秒数或者分秒数来表示,分秒数的单位是纳秒。


          tv_nsec;   

使用这个结构时,需要指定愿意等待多长时间,时间值是一个绝对数而不是相对数。例如,如果能等待3分钟,就需要把当前时间加上3分钟再转换到timespec结构,而不是把3分钟转换成timespec结构。

可以使用gettimeofday(http://www.cnblogs.com/nufangrensheng/p/3507715.html)获取用timeval结构表示的当前时间,然后把这个时间转换成timespec结构。要得到timeout值的绝对时间,可以使用下面的函数:

 timespec *tsp, &->tv_sec =->tv_nsec = now.tv_usec * ;    
    ->tv_sec += minutes * 

如果时间值到了但是条件还是没有出现,pthread_cond_timedwait将重新获取互斥量,然后返回错误ETIMEDOUT从pthread_cond_wait或者pthread_cond_timedwait调用成功返回时,线程需要重新计算条件,因为其他的线程可能已经在运行并改变了条件。

有两个函数可以用于通知线程条件已经满足。pthread_cond_signal函数将唤醒等待该条件的某个线程,而pthread_cond_broadcast函数将唤醒等待该条件的所有线程。

POSIX规范为了简化实现,允许pthread_cond_signal在实现的时候可以唤醒不止一个线程。

#include <pthread.h>

 pthread_cond_signal( pthread_cond_t * pthread_cond_broadcast( pthread_cond_t *

调用pthread_cond_signal或者pthread_cond_broadcast,也称为向线程或条件发送信号。必须注意一定要在改变条件状态以后再给线程发送信号

实例

程序清单11-9 使用条件变量

#include <pthread.h>

 msg * msg *== msg *&(workq ==从pthread_cond_wait或者pthread_cond_timedwait调用成功返回时,线程需要重新计算条件 */
        {
            pthread_cond_wait(&qready, &== mp->& msg *&->m_next ==&&

本篇博文内容摘自《UNIX环境高级编程》(第二版),仅作个人学习记录所用。关于本书可参考:http://www.apuebook.com/

相关内容