Linux下多线程编程互斥锁和条件变量的简单使用


Linux下的多线程遵循POSIX线程接口,称为pthread。编写Linux下的多线程程序,需要使用头文件pthread.h,链接时需要使用库libpthread.a。线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器、一组寄存器和栈),但是它可与同属一个进程的其它的线程共享进程所拥有的全部资源。当多个任务可以并行执行时,可以为每个任务启动一个线程。

线程是并发运行的。在串行程序基础上引入线程和进程是为了提供程序的并发度,从而提高程序运行效率和响应时间。

与进程相比,线程的优势:(1)、线程共享相同的内存空间,不同的线程可以存取内存中的同一个变量;(2)、与标准fork()相比,线程带来的开销很小,节省了CPU时间,使得线程创建比新进程创建快上十到一百倍。

适应多线程的理由:(1)、和进程相比,它是一种非常“节俭”的多任务操作方式,在Linux系统下,启动一个新的进程必须分配给它独立的地址空间,建立众多的数据表来维护它的代码段、堆栈段和数据段,这是一种“昂贵”的多任务工作方式。而运行一个进程中的多个线程,它们彼此之间使用相同的地址空间,共享大部分数据,启动一个线程花费的空间远远小于启动一个进程所花费的空间,而且,线程间彼此切换所需的时间也远远小于进程间切换所需要的时间;(2)、线程间方便的通信机制。对不同的进程来说,它们具有独立的数据空间,要进行数据的传输只能通过通信的方式进行,这种方式不仅费时,而且很不方便。线程则不然,同一进程下的线程之间共享数据空间,所以一个线程的数据可以直接为其它线程所用,这不仅快捷,而且方便。

多线程程序作为一种多任务、并发的工作方式,其优点包括:(1)、提供应用程序响应;(2)、使多CPU系统更加有效:操作系统会保证当线程数不大于CPU数目时,不同的线程运行在不同的CPU上;(3)、改善程序结构:一个既长又复杂的进程可以考虑分为多个线程,成为几个独立或半独立的运行部分,这样的程序利于理解和修改。

pthread_create:用于在调用的进程中创建一个新的线程。它有四个参数,第一个参数为指向线程标识符指针;第二个参数用来设置线程属性;第三个参数是线程运行函数的起始地址;第四个参数是运行函数的参数。

在一个线程中调用pthread_create函数创建新的线程后,当前线程从pthread_create处继续往下执行。pthread_create函数的第三个参数为新创建线程的入口函数的起始地址,此函数接收一个参数,是通过第四个参数传递给它的,该参数的类型是void*,这个指针按什么类型解释由调用者自己定义,入口函数的返回值类型也是void*,这个指针的含义同样由调用者自己定义,入口函数返回时,这个线程就退出了,其它线程可以调用pthread_join函数得到入口函数的返回值。

pthread_join:线程阻塞函数,用于阻塞当前的线程,直到另外一个线程运行结束;使一个线程等待另一个线程结束;让主线程阻塞在这个地方等待子线程结束;代码中如果没有pthread_join主线程会很快结束从而使整个进程结束,从而使创建的线程没有机会开始执行就结束了,加入pthread_join后,主线程会一直等待直到等待的线程结束自己才结束,使创建的线程有机会执行。

pthread_create将一个线程拆分为两个,pthread_join()将两个线程合并为一个线程。

一个线程实际上就是一个函数,创建后,立即被执行,当函数返回时该线程也就结束了。

线程终止时,一个需要注意的问题是线程间的同步问题。一般情况下,进程中各个线程的运行是相互独立的,线程的终止并不会相互通知,也不会影响其它线程,终止的线程所占用的资源不会随着线程的终止而归还系统,而是仍然为线程所在的进程持有。一个线程仅允许一个线程使用pthread_join等待它的终止,并且被等待的线程应该处于可join状态,而非DETACHED状态。一个可”join”的线程所占用的内存仅当有线程对其执行了pthread_join()后才会释放,因此为了避免内存泄露,所有线程终止时,要么设为DETACHED,要么使用pthread_join来回收资源。一个线程不能被多个线程等待。

所有线程都有一个线程号,也就是threadid,其类型为pthread_t,通过调用pthread_self函数可以获得自身的线程号。

Linux线程同步的几种基本方式:join、互斥锁(mutex)、读写锁(read-writelock)、条件变量(condition variables)。mutex的本质是锁,而条件变量的本质是等待。

互斥:简单的理解就是,一个线程进入工作区后,如果有其它线程想要进入工作区,它就会进入等待状态,要等待工作区内的线程结束后才可以进入。

互斥提供线程间资源的独占访问控制。它是一个简单的锁,只有持有它的线程才可以释放那个互斥。它确保了它们正在访问的共享资源的完整性,因为在同一时刻只允许一个线程访问它。

互斥操作,就是对某段代码或某个变量修改的时候只能有一个线程在执行这段代码,其它线程不能同时进入这段代码或同时修改该变量。这个代码或变量称为临界资源。

通过锁机制实现线程间的同步,同一时刻只允许一个线程执行一个关键部分的代码。

有两种方式创建互斥锁,静态方式和动态方式。

在默认情况下,Linux下的同一线程无法对同一互斥锁进行递归加锁,否则将发生死锁。所谓递归加锁,就是在同一线程中试图对互斥锁进行两次或两次以上的行为。解决问题的方法就是显示地在互斥变量初始化时将其设置成recursive属性。

互斥量是一种用于多线程中的同步访问的方法,它允许程序锁住某个对象或者某段代码,使得每次只能有一个线程访问它。为了控制对关键对象或者代码的访问,必须在进入这段代码之前锁住一个互斥量,然后在完成操作之后解锁。

互斥量用pthread_mutex_t数据类型来表示,在使用互斥变量之前,必须首先对它进行初始化,可以把它置为常量PTHREAD_MUTEX_INITIALIZER(只对静态分配的互斥量),也可以通过调用pthread_mutex_init函数进行初始化。如果动态地分配互斥量(如调用malloc)函数,那么释放内存前(free)需要使用pthread_mutex_destroy函数。

对共享资源的访问,要对互斥量进行加锁,如果互斥量已经上了锁,调用线程会阻塞,直到互斥量被解锁。在完成了对共享资源的访问后,要对互斥量进行解锁。

pthread_mutex_init函数:主要用于多线程中互斥锁的初始化。如果要用默认的属性初始化互斥量,只需把第二个参数设置为NULL。互斥量的属性可以分为四种:(1)、PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁,当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁,这种锁策略保证了资源分配的公平性;(2)、PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许线程多次加锁,不同线程,解锁后重新竞争;(3)、PTHREAD_MUTEX_ERRORCHECK_NP,检错,如果该互斥量已经被上锁,那么后续的上锁将会失败而不会阻塞,否则与PTHREAD_MUTEX_TIMED_NP类型相同,这样就保证当不允许多次加锁时不会出现最简单情况下的死锁;(4)、PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。

pthread_mutex_destroy函数:销毁(注销)线程互斥锁;销毁一个互斥锁即意味着释放它所占用的资源,且要求锁当前处于开放状态。

pthread_mutex_lock:占有互斥锁(阻塞操作);互斥锁被锁定,如果这个互斥锁被一个线程锁定和拥有,那么另一个线程要调用这个函数就会进入阻塞状态(即等待状态),直到互斥锁被释放为止;互斥量一旦被上锁后,其它线程如果想给该互斥量上锁,那么就会阻塞在这个操作上,如果在此之前该互斥量已经被其它线程上锁,那么该操作将会一直阻塞在这个地方,直到获得该锁为止。

pthread_mutex_unlock:释放互斥锁;在操作完成后,必须调用该函数给互斥量解锁,这样其它等待该锁的线程才有机会获得该锁,否则其它线程将会永远阻塞。

与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。条件变量分为两部分:条件和变量。条件本身是由互斥量保护的。线程在改变条件状态前先要锁住互斥量。条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待“条件变量的条件成立”而挂起;另一个线程使“条件成立”(给出条件成立信号)。条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两线程共享可读写的内存,条件变量可以被用来实现这两线程间的线程同步。

互斥锁一个明显的缺点是它只有两种状态,锁定和非锁定。而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。一般来说,条件变量被用来进行线程间的同步。条件变量只是起阻塞和唤醒线程的作用,具体的判断条件还需用户给出。线程被唤醒后,它将重新检查判断条件是否满足,如果还不满足,一般说来线程应该仍阻塞在这里,被等待被下一次唤醒。这个过程一般用while语句实现。

条件变量用pthread_cond_t结构体来表示。

pthread_cond_init:初始化一个条件变量,当第二个参数属性为空指针时,函数创建的是一个缺省的条件变量,否则条件变量的属性将由第二个参数的属性值来决定。不能由多个线程同时初始化一个条件变量。当需要重新初始化或释放一个条件变量时,应用程序必须保证这个条件变量未被使用。

pthread_cond_wait:阻塞在条件变量上,函数将解锁第二个参数指向的互斥锁,并使当前线程阻塞在第一个参数指向的条件变量上。被阻塞的线程可以被pthread_cond_signal、pthread_cond_broadcast函数唤醒,也可能在被信号中断后被唤醒。

一般一个条件表达式都是在一个互斥锁的保护下被检查。当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。当另一个线程改变了条件的值并向条件变量发出信号时,等待在这个条件变量上的一个线程或所有线程被唤醒,接着都试图再次占有相应的互斥锁。阻塞在条件变量上的线程被唤醒以后,直到pthread_cond_wait函数返回之前,条件的值都有可能发生变化。所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。阻塞在同一个条件变量上的不同线程被释放的次序是不一定的。

pthread_cond_wait函数是退出点,如果在调用这个函数时,已有一个挂起的退出请求,且线程允许退出,这个线程将被终止并开始执行善后处理函数,而这时和条件变量相关的互斥锁仍将处在锁定状态。

pthread_cond_signal:解除在条件变量上的阻塞。此函数被用来释放被阻塞在指定条件变量上的一个线程。一般在互斥锁的保护下使用相应的条件变量,否则对条件变量的解锁有可能发生在锁定条件变量之前,从而造成死锁。唤醒阻塞在条件变量上的所有线程的顺序由调度策略决定。

pthread_cond_timewait:阻塞直到指定时间。函数到了一定的时间,即使条件未发生也会解除阻塞。这个时间由第三个参数指定。

pthread_cond_broadcast:释放阻塞的所有线程。函数唤醒所有被pthread_cond_wait函数阻塞在某个条件变量上的线程。当没有线程阻塞在这个条件变量上时,此函数无效。此函数唤醒所有阻塞在某个条件变量上的线程,这些线程被唤醒后将再次竞争相应的互斥锁。

pthread_cond_destroy:释放条件变量。条件变量占用的空间未被释放。

pthread_cond_wait和pthread_cond_timewait一定要在mutex的锁定区域内使用;而pthread_cond_signal和pthread_cond_broadcoast无需考虑调用线程是否是mutex的拥有者,可以在lock与unlock以外的区域调用。

一个特定条件只能有一个互斥对象,而且条件变量应该表示互斥数据“内部”的一种特殊的条件更改。一个互斥对象可以有许多条件变量,但每个条件变量只能有一个互斥对象。

以上所有线程相关函数,函数执行成功时返回0,返回其它非0值表示错误。

以下是一些测试例子:

1. test_create_thread.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

void* run(void* para)
{
 cout<<"start new thread!"<<endl;
 
 //sleep(5);//suspend 5 s,在正式的代码中,一般不要用sleep函数
 int* iptr = (int*)((void**)para)[0];
 float* fptr = (float*)((void**)para)[1];
 char* str = (char*)((void**)para)[2];
 cout << *iptr << "    " << *fptr << "    " << str << endl;

 cout<<"end new thread!"<<endl;
 
 return ((void *)0);
}

int main()
{
 pthread_t pid;//thread handle
 int err = -1;
 int ival = 1;
 float fval = 10.0F;
 char buf[] = "func";
 void* para[3] = { &ival, &fval, buf };

 err = pthread_create(&pid, NULL, run, para);
 if (err != 0) {
  cout << "can't create thread!" << endl;
  return -1;
 }

 //新线程创建之后主线程如何运行----主线程按顺序继续执行下一行程序
 cout << "main thread!" << endl;
 
 //新线程结束时如何处理----新线程先停止,然后作为其清理过程的一部分,等待与另一个线程合并或“连接”
 pthread_join(pid, NULL);

 cout << "ok!" << endl;

 return 0;
}

//终端执行:$ g++ -o test_create_thread test_create_thread.cpp -lpthread
//    $ ./test_create_thread

2. test_thread_mutex.cpp:

#include <pthread.h>
#include <iostream>

using namespace std;

pthread_t tid[2];
int counter = 0;
pthread_mutex_t lock;

void* run(void* arg)
{
 pthread_mutex_lock(&lock);

 unsigned long i = 0;
 counter += 1;
 cout << "Job " << counter << " started!" << endl;
 for (i = 0; i<(0xFFFFFFFF); i++);
 cout << "Job " << counter << " finished!" << endl;

 pthread_mutex_unlock(&lock);

 return NULL;
}

int main()
{
 int i = 0, err = -1;

 if (pthread_mutex_init(&lock, NULL) != 0) {
  cout << "mutex init failed" << endl;
  return -1;
 }

 while (i < 2) {
  err = pthread_create(&(tid[i]), NULL, &run, NULL);
  if (err != 0)
   cout << "can't create thread!" << endl;
  i++;
 }

 pthread_join(tid[0], NULL);
 pthread_join(tid[1], NULL);
 pthread_mutex_destroy(&lock);

 cout << "ok!" << endl;
 return 0;
}

//终端执行:$ g++ -o test_thread_mutex test_thread_mutex.cpp -lpthread
//    $ ./test_thread_mutex

3. test_thread_cond.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t count_lock;
pthread_cond_t count_nonzero;
unsigned count = 0;

void* decrement_count(void* arg)
{
 pthread_mutex_lock(&count_lock);
 cout << "decrement_count get count_lock" << endl;

 while (count == 0) {
  cout << "decrement_count count == 0" << endl;

  cout << "decrement_count before cond_wait" << endl;
  pthread_cond_wait(&count_nonzero, &count_lock);
  cout << "decrement_count after cond_wait" << endl;
 }

 count = count + 1;

 pthread_mutex_unlock(&count_lock);

 return NULL;
}

void* increment_count(void* arg)
{
 pthread_mutex_lock(&count_lock);
 cout << "increment_count get count_lock" << endl;

 if (count == 0) {
  cout << "increment_count before cond_signal" << endl;
  pthread_cond_signal(&count_nonzero);
  cout << "increment_count after cond_signal" << endl;
 }

 count = count + 1;

 pthread_mutex_unlock(&count_lock);

 return NULL;
}

int main()
{
 pthread_t tid1, tid2;

 pthread_mutex_init(&count_lock, NULL);
 pthread_cond_init(&count_nonzero, NULL);

 pthread_create(&tid1, NULL, decrement_count, NULL);
 sleep(2);

 pthread_create(&tid2, NULL, increment_count, NULL);
 sleep(2);

 pthread_join(tid1, NULL);
 pthread_join(tid2, NULL);
 pthread_mutex_destroy(&count_lock);
 pthread_cond_destroy(&count_nonzero);

 cout << "ok!" << endl;
}

//终端执行:$ g++ -o test_thread_cond test_thread_cond.cpp -lpthread
//    $ ./test_thread_cond

4. test_thread_cond1.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t counter_lock;
pthread_cond_t counter_nonzero;
int counter = 0;

void* decrement_counter(void* argv);
void* increment_counter(void* argv);

int main()
{
 cout << "counter: " << counter << endl;
 pthread_mutex_init(&counter_lock, NULL);
 pthread_cond_init(&counter_nonzero, NULL);

 pthread_t thd1, thd2;
 int ret = -1;

 ret = pthread_create(&thd1, NULL, decrement_counter, NULL);
 if (ret){
  cout << "create thread1 fail" << endl;
  return -1;
 }

 ret = pthread_create(&thd2, NULL, increment_counter, NULL);
 if (ret){
  cout << "create thread2 fail" << endl;
  return -1;
 }

 int counter = 0;
 while (counter != 10) {
  cout << "counter(main): " << counter << endl;
  sleep(1);
  counter++;
 }

 pthread_join(thd1, NULL);
 pthread_join(thd2, NULL);
 pthread_mutex_destroy(&counter_lock);
 pthread_cond_destroy(&counter_nonzero);

 cout << "ok!" << endl;
}

void* decrement_counter(void* argv)
{
 cout << "counter(decrement): " << counter << endl;

 pthread_mutex_lock(&counter_lock);
 while (counter == 0)
  pthread_cond_wait(&counter_nonzero, &counter_lock); //进入阻塞(wait),等待激活(signal)

 cout << "counter--(decrement, before): " << counter << endl;
 counter--; //等待signal激活后再执行 
 cout << "counter--(decrement, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock);

 return NULL;
}

void* increment_counter(void* argv)
{
 cout << "counter(increment): " << counter << endl;

 pthread_mutex_lock(&counter_lock);
 if (counter == 0)
  pthread_cond_signal(&counter_nonzero); //激活(signal)阻塞(wait)的线程(先执行完signal线程,然后再执行wait线程) 

 cout << "counter++(increment, before): " << counter << endl;
 counter++;
 cout << "counter++(increment, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock);

 return NULL;
}

//终端执行:$ g++ -o test_thread_cond1 test_thread_cond1.cpp -lpthread
//    $ ./test_thread_cond1

5. test_thread_cond2.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t counter_lock1, counter_lock2;
pthread_cond_t counter_nonzero1, counter_nonzero2;
int counter = 0;

void* decrement_increment_counter(void* argv);

int main()
{
 cout << "counter: " << counter << endl;
 pthread_mutex_init(&counter_lock1, NULL);
 pthread_mutex_init(&counter_lock2, NULL);
 pthread_cond_init(&counter_nonzero1, NULL);
 pthread_cond_init(&counter_nonzero2, NULL);

 pthread_t thd;
 int ret = -1;

 ret = pthread_create(&thd, NULL, decrement_increment_counter, NULL);
 if (ret){
  cout << "create thread1 fail" << endl;
  return -1;
 }

 int counter = 0;
 while (counter != 10) {
  cout << "counter(main): " << counter << endl;
  sleep(1);
  counter++;
 }

 pthread_join(thd, NULL);
 pthread_mutex_destroy(&counter_lock1);
 pthread_mutex_destroy(&counter_lock2);
 pthread_cond_destroy(&counter_nonzero1);
 pthread_cond_destroy(&counter_nonzero2);

 cout << "ok!" << endl;
}

void* decrement_increment_counter(void* argv)
{
 cout << "start counter: " << counter << endl;

 pthread_mutex_lock(&counter_lock1);
 cout << "counter(decrement): " << counter << endl;
 while (counter == 1)
  pthread_cond_wait(&counter_nonzero1, &counter_lock1); //进入阻塞(wait),等待激活(signal)

 cout << "counter--(decrement, before): " << counter << endl;
 counter--; //等待signal激活后再执行 
 cout << "counter--(decrement, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock1);

 pthread_mutex_lock(&counter_lock2);
 cout << "counter(increment): " << counter << endl;
 if (counter == 0)
  pthread_cond_signal(&counter_nonzero2); //激活(signal)阻塞(wait)的线程(先执行完signal线程,然后再执行wait线程) 

 cout << "counter++(increment, before): " << counter << endl;
 counter++;
 cout << "counter++(increment, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock2);

 return NULL;
}

//终端执行:$ g++ -o test_thread_cond2 test_thread_cond2.cpp -lpthread
//    $ ./test_thread_cond2

本文永久更新链接地址

相关内容