C++/STL/ConditionVariable

< C++

condition_variable標準程式庫中的一個头文件,定义了C++11标准中的一些用于并发编程时表示条件变量的类与方法等。从g++ 4.8.1、Visual C++ 2012都已经支持了C++11标准中定义的condition_variable头文件。

背景简介

编辑

条件变量并发程序设计中的一种控制结构。多个线程访问一个共享资源(或称临界区)时,不但需要用互斥锁实现独享访问以避免并发错误(称为竞争危害),在获得互斥锁进入临界区后还需要检验特定条件是否成立:

  • 如果不满足该条件,拥有互诉锁的线程应该释放该互斥锁、把自身阻塞(block)并挂到(suspend)条件变量的线程队列中;
  • 如果满足该条件,拥有互诉锁的线程在临界区内访问共享资源,在退出临界区时通知(notify)在条件变量的线程队列中处于阻塞状态的线程,被通知的线程必须重新申请对该互斥锁加锁。

如上,实际上使用了两个处于阻塞状态的线程的队列,分别为条件变量与互斥锁所拥有。

C++11的标准库中新增加的条件变量的实现,与pthread的实现语义完全一致。

使用条件变量做并发控制时,某一时刻阻塞在一个条件变量上的各个线程应该在调用wait操作时指明同一个互斥锁,此时该条件变量与该互斥锁绑定;否则程序的行为未定义。条件变量必须与互斥锁配合使用,其理由是程序需要判定某个条件(condition或称predict)是否成立,该条件可以是任意复杂。通行的编程样例为(伪代码):

 mutex.lock();//互斥锁加锁
 while(predict()!=true) // predict可以任意复杂,但必须互斥独占访问
    conditionVariable.wait();   
 //退出while循环执行至此时,既有predict()为真且获得了mutex加锁 

离开临界区的线程用notify操作解除阻塞(unblock)在条件变量上的各个线程时,按照公平性(fairness)这些线程应该有平等的获得互斥锁的机会,不应让某个线程始终难以获得互斥锁被饿死(starvation),并且比后来到临界区的其它线程更为优先(即基本上FIFO)。一种办法是调用了notify_all的线程保持互斥锁,直到所有从条件变量上解除阻塞的线程都已经挂起(suspend)到互斥锁上,然后发起了notify_all的线程再释放互斥锁。[1]互斥锁上一般都有比较完善的阻塞线程调度算法,一般会按照线程优先级调度,相同优先级按照FIFO调度。

发起notify的线程不需要拥有互斥锁。

即将离开临界区的线程是先释放互斥锁还是先notify操作解除在条件变量上挂起线程的阻塞?表面看两种顺序都可以。但一般建议是先notify操作,后对互斥锁解锁。因为这既有利于上述的公平性,同时还避免了相反顺序时可能的w:优先级倒置。这种先notify后解锁的做法是悲观的(pessimization),因为被通知(notified)线程将立即被阻塞,等待通知(notifying)线程释放互斥锁。很多实现(特别是pthreads的很多实现)为了避免这种“匆忙与等待”(hurry up and wait)情形,把在条件变量的线程队列上处于等待的被通知线程直接移到互斥锁的线程队列上,而不唤醒这些线程。

std::condition_variable类

编辑

std::condition_variable类表示w:条件变量。效果上相当于包装了w:pthread库中的pthread_cond_*()系列的函数。

  • 构造函数
    • condition_variable();缺省构造函数
    • condition_variable (const condition_variable&) = delete;禁止拷贝构造函数
  • 成员函数
    • void wait (unique_lock<mutex>& lck); 无条件被阻塞。调用该函数前,当前线程应该已经对unique_lock<mutex> lck完成了加锁。所有使用同一个条件变量的线程必须在wait函数中使用同一个unique_lock<mutex>。该wait函数内部会自动调用lck.unlock()对互斥锁解锁,使得其他被阻塞在互斥锁上的线程恢复执行。使用本函数被阻塞的当前线程在获得通知(notified,通过别的线程调用 notify_*系列的函数)而被唤醒后,wait()函数恢复执行并自动调用lck.lock()对互斥锁加锁。
    • template <class Predicate> void wait (unique_lock<mutex>& lck, Predicate pred);带条件的被阻塞。wait函数设置了谓词(Predicate),只有当pred条件为false时调用该wait函数才会阻塞当前线程,并且在收到其他线程的通知后只有当pred为true时才会被解除阻塞。因此,等效于while (!pred()) wait(lck);
    • template <class Rep, class Period> cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time);指定一个时间段,在当前线程收到通知(notify)或者超过指定时间段,wait_for 返回
    • template <class Rep, class Period, class Predicate> bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred); 有条件阻塞且超时返回。
    • template <class Clock, class Duration> cv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time);指定一个绝对时间点,超时wait_until返回。
    • template <class Clock, class Duration, class Predicate> bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred);有条件阻塞且超时返回。
    • notify_one():唤醒某个等待线程,该线程是通过该条件变量的某个wait函数阻塞在该条件变量的线程队列上。如果当前没有等待线程,则该函数什么也不做
    • notify_all():唤醒所有的等待(wait)线程。如果当前没有等待线程,则该函数什么也不做。

std::condition_variable_any类

编辑

与std::condition_variable用法一样,区别仅在于std::condition_variable_any 的 wait 函数可以接受任何 lockable 参数,而 std::condition_variable 只能接受 std::unique_lock<std::mutex> 类型的参数。

std::cv_status枚举类型

编辑
  • std::cv_status::no_timeout: wait_for 或者 wait_until 没有超时即返回,即在规定的时间段内线程收到了通知。
  • std::cv_status::timeout: wait_for 或者 wait_until 超时后返回。

函数std::notify_all_at_thread_exit()

编辑

函数原型为:

void notify_all_at_thread_exit (condition_variable& cond, unique_lock<mutex> lck);

当调用该函数的线程退出时,所有在 cond 条件变量上等待的线程都会收到通知。Microsoft Visual C++ 2013已经支持了该函数;但GCC 4.9.3尚未支持该函数。

例子程序

编辑
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // 等待主线程设置好ready变量为真
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;}); //或者为 cv.wait(lk);
 
    // 现在拥有了互斥锁m,变量ready为真,已进入了临界区
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // Send data back to main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // 手工解锁,并通知阻塞在cv上的某个线程。  
    cv.notify_one();
    lk.unlock();
}
 
int main()
{
    std::thread worker(worker_thread); //启动工作线程
 
    data = "Example data";
    //把ready变量由false变为true,这使得工作线程进入临界区
    {
        std::lock_guard<std::mutex> lk(m); //由于工作线程不可能更早得到ready为真,所以主线程很快就会获得互斥锁m
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one(); //通知已经阻塞在cv上的某个线程;如果没有线程被阻塞,则什么也不做
 
    // 等待工作线程——主线程需要获得互斥锁m且processed变量变为真
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
 
    worker.join();
}

示例:实现信号量

编辑
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};

参考文献

编辑
  1. Douglas C. Schmidt and Irfan Pyarali:《Strategies for Implementing POSIX Condition Variables on Win32》§3.4. The SignalObjectAndWait Solution