条件变量(condition variable)是常用的线程同步技术,通常用于让线程阻塞并等待某个条件满足。它需要与互斥锁搭配使用,但是你有想过为什么需要搭配一个互斥锁呢?

1 条件变量的标准范式

条件变量常用于生产者消费者模式。以下是条件变量的伪代码范例,适用于单消费者模型。

thread:
    initialise.
    lock mutex.
    while thread not told to stop working:
	wait on condvar using mutex.
	if work is available to be done:
	    do the work.
    unlock mutex.
    clean up.
    exit thread.

在上述伪代码中,互斥量是用来保护条件变量的,这就是在条件变量调用等待 wait 之前,需要互斥量上锁的原因。等待 wait 操作会释放互斥锁,这样其他线程就有机会操作条件变量了。当条件变量被唤醒后,会自动对互斥量加锁。

条件变量省去了轮询是否达到某个条件的操作,可以等待其他线程在条件满足时通知等待的线程,伪代码如下。

lock mutex.
flag work as available.
signal condition variable.
unlock mutex.

对于多消费者模型,在上述伪代码中,当其中一个消费者满足工作条件时,互斥量仍是锁定状态,这样其他线程在这时就会阻塞。所以,需要在消费者线程开始工作之前解锁互斥量以允许其他消费者线程可以正常运行。示例代码如下:

thread:
    initialise.
    lock mutex.
    while thread not told to stop working:
	wait on condvar using mutex.
	if work is available to be done:
	    copy work to thread local storage.
	    unlock mutex.
	    do the work.
	    lock mutex.
    unlock mutex.
    clean up.
    exit thread.

2 条件变量的虚假唤醒(spurious wakeup)

虚假唤醒是指在等待 wait 条件变量时,会在没有被其他线程通知的情况下提前结束等待。虚假唤醒是偶发的,所以就需要在条件变量等待结束时判断是否怎的满足了执行条件。

This means that when you wait on a condition variable, the wait may (occasionally) return when no thread specifically broadcast or signaled that condition variable. Spurious wakeups may sound strange, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations. The race conditions that cause spurious wakeups should be considered rare.

3 C++11 中的条件变量

以下代码是C++11标准下的线程和条件变量示例代码,可以看到条件变量搭配了互斥量一起使用,用于保护条件变量和两个标志变量( readyprocessed )。

注意 : 在C++标准库条件变量的 notify 操作之前是不需要互斥量的锁定状态的,这点与POSIX标准的 pthread 接口有些不同。可以理解为在C++11的互斥锁只负责保护条件变量配套的布尔值;而POSIX的互斥锁既要保护布尔值,也要保护条件变量。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
    // Wait until main() sends data
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});

    // after the wait, we own the lock.
    std::cout << "Worker thread is processing data\n";
    data += " after processing";

    // Send data back to main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";

    // Manual unlocking is done before notifying, to avoid waking up
    // the waiting thread only to block again (see notify_one for details)
    lk.unlock();
    cv.notify_one();
}

int main()
{
    std::thread worker(worker_thread);

    data = "Example data";
    // send data to the worker thread
    {
	std::lock_guard<std::mutex> lk(m);
	ready = true;
	std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();

    // wait for the worker
    {
	std::unique_lock<std::mutex> lk(m);
	cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';

    worker.join();
}

4 boost 中的条件变量

以下代码是boost库的条件变量示例代码。

#include <iostream>
#include <boost/thread.hpp>

boost::mutex io_mutex;
boost::condition_variable condition;
bool worker_is_done = false;

void workFunction()
{
    std::cout << "Waiting a little..." << std::endl;
    boost::this_thread::sleep(boost::posix_time::seconds(1));

    {
	boost::lock_guard<boost::mutex> guard(io_mutex);
	worker_is_done = true;
    }

    std::cout << "Notifying condition..." << std::endl;
    condition.notify_one();

    std::cout << "Waiting a little more..." << std::endl;
    boost::this_thread::sleep(boost::posix_time::seconds(1));
}

int main()
{
    boost::thread workThread(&workFunction);

    boost::unique_lock<boost::mutex> lock(io_mutex);
    while (!worker_is_done)
	 condition.wait(lock);

    std::cout << "Condition notified." << std::endl;
    workThread.join();
    std::cout << "Thread finished." << std::endl;

    return 0;
}