ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

《C++ Concurrency in Action》笔记

2022-01-28 17:58:30  阅读:253  来源: 互联网

标签:std thread lock void C++ 线程 Concurrency Action data


《C++ Concurrency in Action》笔记

英文书籍《C++ Concurrency in Action》国内有不同人翻译,有一版是4个译者翻译的,豆瓣知乎风评一般。陈晓伟翻译的不错,下面是我对该译者书籍的读书笔记。

1 你好,C++的并发世界

1.1 何谓并发

最简单和最基本的并发,是指两个或更多独立的活动同时发生。
并发在生活中随处可见,我们可以一边走路一边说话,也可以两只手同时作不同的动作,还有我们每个人都过着相互独立的生活——当我在游泳的时候,你可以看球赛,等等。

1.1.1 计算机系统中的并发

计算机领域的并发指的是在单个系统里同时执行多个独立的任务,而非顺序的进行一些活动。
系统通过任务调度,在时间片微粒度下,做任务切换,让用户感觉不到间隙,这种也被称为并发。
多处理器计算机可实现硬件并发。
在这里插入图片描述
图1.2显示了四个任务在双核处理器上的任务切换,仍然是将任务整齐地划分为同等大小块的理想情况。实际上,许多因素会使得分割不均和调度不规则。
在这里插入图片描述

1.1.2 并发的途径

途径一:一个进程内含一个线程,交互是进程间通信;
途径二:一个进程内含多个线程,交互是线程间通信。

多进程并发

如图1.3所示,独立的进程可以通过进程间常规的通信渠道传递讯息(信号、套接字、文件、管道等等)。不过,这种进程之间的通信通常不是设置复杂,就是速度慢,这是因为操作系统会在进程间提供了一定的保护措施,以避免一个进程去修改另一个进程的数据。还有一个缺点是,运行多个进程所需的固定开销:需要时间启动进程,操作系统需要内部资源来管理进程,等等。
当然,以上的机制也不是一无是处:操作系统在进程间提供附加的保护操作和更高级别的通信机制,意味着可以更容易编写安全的并发代码。实际上,在类似于Erlang的编程环境中,将进程作为并发的基本构造块。
使用多进程实现并发还有一个额外的优势———可以使用远程连接(可能需要联网)的方式,在不同的机器上运行独立的进程。虽然,这增加了通信成本,但在设计精良的系统上,这可能是一个提高并行可用行和性能的低成本方式。
在这里插入图片描述

多线程并发

并发的另一个途径,在单个进程中运行多个线程。线程很像轻量级的进程:每个线程相互独立运行,且线程可以在不同的指令序列中运行。但是,进程中的所有线程都共享地址空间,并且所有线程访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。虽然,进程之间通常共享内存,但是这种共享通常是难以建立和管理的。因为,同一数据的内存地址在不同的进程中是不相同。图1.4展示了一个进程中的两个线程通过共享内存进行通信。
在这里插入图片描述
地址空间共享,以及缺少线程间数据的保护,使得操作系统的记录工作量减小,所以使用多线程相关的开销远远小于使用多个进程。不过,共享内存的灵活性是有代价的:如果数据要被多个线程访问,那么程序员必须确保每个线程所访问到的数据是一致的(在本书第3、4、5和8章中会涉及,线程间数据共享可能会遇到的问题,以及如何使用工具来避免这些问题)。问题并非无解,只要在编写代码时适当地注意即可,这同样也意味着需要对线程通信做大量的工作。

1.2 为什么使用并发?

主要原因有两个:关注点分离(SOC)和性能

1.2.1 为了分离关注点

编写软件时,分离关注点是个好主意;通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。即使一些功能区域中的操作需要在同一时刻发生的情况下,依旧可以使用并发分离不同的功能区域;若不显式地使用并发,就得编写一个任务切换框架,或者在操作中主动地调用一段不相关的代码。

1.2.2 为了性能

多处理器系统,现在越来越普遍。如果想要利用日益增长的计算能力,那就必须设计多任务并发式软件。
两种方式利用并发提高性能。一是业务逻辑并行,即任务并行(task parallelism),二是数据并行(data parallelism),如图像处理。

1.2.3 什么时候不使用并发

基本上,不使用并发的唯一原因就是,收益比不上成本。并发代码增加开发成本,更高复杂度增加出错风险。除非潜在的性能增益足够大或关注点分离地足够清晰,否则,别用并发。
此外,线程是有限的资源。如果让太多的线程同时运行,则会消耗很多操作系统资源,从而使得操作系统整体上运行得更加缓慢。不仅如此,因为每个线程都需要一个独立的堆栈空间,所以运行太多的线程也会耗尽进程的可用内存或地址空间。对于一个可用地址空间为4GB(32bit)的平坦架构的进程来说,这的确是个问题:如果每个线程都有一个1MB的堆栈(很多系统都会这样分配),那么4096个线程将会用尽所有地址空间,不会给代码、静态数据或者堆数据留有任何空间。即便64位(或者更大)的系统不存在这种直接的地址空间限制,但其他资源有限:如果你运行了太多的线程,最终也是出会问题的。尽管线程池(参见第9章)可以用来限制线程的数量,但这也并不是什么灵丹妙药,它也有自己的问题。
客户端/服务器(C/S)应用在服务器端为每一个链接启动一个独立的线程,对于少量的链接是可以正常工作的,但当同样的技术用于需要处理大量链接的高需求服务器时,也会因为线程太多而耗尽系统资源。在这种场景下,使用线程池可以对性能产生优化(参见第9章)。

1.3 C++中的并发和多线程

通过多线程为C++并发提供标准化支持是件新鲜事。只有在C++11标准下,才能编写不依赖平台扩展的多线程代码。了解C++线程库中的众多规则前,先来了解一下其发展的历史。

1.3.1 C++多线程历史

C++98(1998)标准不承认线程的存在,并且各种语言要素的操作效果都以顺序抽象机的形式编写。不仅如此,内存模型也没有正式定义,所以在C++98标准下,没办法在缺少编译器相关扩展的情况下编写多线程应用程序。
当然,编译器供应商可以自由地向语言添加扩展,添加C语言中流行的多线程API———POSIX标准中的C标准和Microsoft Windows API中的那些———这就使得很多C++编译器供应商通过各种平台相关扩展来支持多线程。所以C++本身不支持并行语法时,程序员们已经编写了大量的C++多线程程序了。

1.3.2 新标准支持并发

C++11新标准中不仅有了一个全新的线程感知内存模型,C++标准库也扩展了:包含了用于管理线程(参见第2章)、保护共享数据(参见第3章)、线程间同步操作(参见第4章),以及低级原子操作(参见第5章)的各种类。
新C++线程库很大程度上,是基于上文提到的C++类库的经验积累。特别是,Boost线程库作为新类库的主要模型,很多类与Boost库中的相关类有着相同名称和结构。随着C++标准的进步,Boost线程库也配合着C++标准在许多方面做出改变,因此之前使用Boost的用户将会发现自己非常熟悉C++11的线程库。
新的C++标准直接支持原子操作,允许程序员通过定义语义的方式编写高效的代码,从而无需了解与平台相关的汇编指令。

1.3.3 C++线程库的效率

为了效率,C++类整合了一些底层工具。这样就需要了解相关使用高级工具和使用低级工具的开销差,这个开销差就是抽象代价(abstraction penalty)。该类库在大部分主流平台上都能实现高效(带有非常低的抽象代价)。

1.3.4 平台相关的工具

在C++线程库中提供一个native_handle()成员函数,允许通过使用平台相关API直接操作底层实现。就其本质而言,任何使用native_handle()执行的操作都是完全依赖于平台的。

1.4 开始入门

#include <iostream>
#include <thread>  //①
void hello()  //②
{
  std::cout << "Hello Concurrent World\n";
}
int main()
{
  std::thread t(hello);  //③
  t.join();  //④
}

2 线程管理

2.1 线程管理的基础

2.1.1 启动线程

void do_some_work();
std::thread my_thread(do_some_work);
class background_task
{
public:
  void operator()() const
  {
    do_something();
    do_something_else();
  }
};

background_task f;
std::thread my_thread(f);

有件事需要注意,当把函数对象传入到线程构造函数中时,需要避免“最令人头痛的语法解析”(C++’s most vexing parse, 中文简介)。如果你传递了一个临时变量,而不是一个命名的变量;C++编译器会将其解析为函数声明,而不是类型对象的定义。

例如:

std::thread my_thread(background_task());

这里相当与声明了一个名为my_thread的函数,这个函数带有一个参数(函数指针指向没有参数并返回background_task对象的函数),返回一个std::thread对象的函数,而非启动了一个线程。
使用在前面命名函数对象的方式,或使用多组括号①,或使用新统一的初始化语法②,可以避免这个问题。

如下所示:

std::thread my_thread((background_task()));  // 1
std::thread my_thread{background_task()};    // 2

使用lambda表达式也能避免这个问题。lambda表达式是C++11的一个新特性,它允许使用一个可以捕获局部变量的局部函数(可以避免传递参数,参见2.2节)。想要具体的了解lambda表达式,可以阅读附录A的A.5节。之前的例子可以改写为lambda表达式的类型:

std::thread my_thread([]{
  do_something();
  do_something_else();
});

启动了线程,你需要明确是要等待线程结束(加入式——参见2.1.2节),还是让其自主运行(分离式——参见2.1.3节)。如果std::thread对象销毁之前还没有做出决定,程序就会终止(std::thread的析构函数会调用std::terminate())。因此,即便是有异常存在,也需要确保线程能够正确的_加入_(joined)或_分离_(detached)。

清单2.1 函数已经结束,线程依旧访问局部变量

struct func
{
  int& i;
  func(int& i_) : i(i_) {}
  void operator() ()
  {
    for (unsigned j=0 ; j<1000000 ; ++j)
    {
      do_something(i);           // 1. 潜在访问隐患:悬空引用
    }
  }
};

void oops()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread my_thread(my_func);
  my_thread.detach();          // 2. 不等待线程结束
}                              // 3. 新线程可能还在运行

这个例子中,已经决定不等待线程结束(使用了detach()②),所以当oops()函数执行完成时③,新线程中的函数可能还在运行。如果线程还在运行,它就会去调用do_something(i)函数①,这时就会访问已经销毁的变量。如同一个单线程程序——允许在函数完成后继续持有局部变量的指针或引用;当然,这从来就不是一个好主意——这种情况发生时,错误并不明显,会使多线程更容易出错。

处理这种情况的常规方法:使线程函数的功能齐全,将数据复制到线程中,而非复制到共享数据中。如果使用一个可调用的对象作为线程函数,这个对象就会复制到线程中,而后原始对象就会立即销毁。但对于对象中包含的指针和引用还需谨慎,例如清单2.1所示。使用一个能访问局部变量的函数去创建线程是一个糟糕的主意(除非十分确定线程会在函数完成前结束)。此外,可以通过join()函数来确保线程在函数完成前结束。

2.1.2 等待线程完成

如果需要等待线程,相关的std::thread实例需要使用join()。在这种情况下,因为原始线程在其生命周期中并没有做什么事,使得用一个独立的线程去执行函数变得收益甚微,但在实际编程中,原始线程要么有自己的工作要做;要么会启动多个子线程来做一些有用的工作,并等待这些线程结束。
这意味着,只能对一个线程使用一次join();一旦已经使用过join(),std::thread对象就不能再次加入了,当对其使用joinable()时,将返回false。

2.1.3 特殊情况下的等待

如果想要分离一个线程,可以在线程启动后,直接使用detach()进行分离。如果打算等待对应线程,通常,当倾向于在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免应用被抛出的异常所终止。
清单 2.2 等待线程完成

struct func; // 定义在清单2.1中
void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  try
  {
    do_something_in_current_thread();
  }
  catch(...)
  {
    t.join();  // 1
    throw;
  }
  t.join();  // 2
}

代码使用了try/catch块确保访问本地状态的线程退出后,函数才结束。

一种方式是使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),并且提供一个类,在析构函数中使用join(),如同下面清单中的代码。看它如何简化f()函数。

清单 2.3 使用RAII等待线程完成

class thread_guard
{
  std::thread& t;
public:
  explicit thread_guard(std::thread& t_):
    t(t_)
  {}
  ~thread_guard()
  {
    if(t.joinable()) // 1
    {
      t.join();      // 2
    }
  }
  thread_guard(thread_guard const&)=delete;   // 3
  thread_guard& operator=(thread_guard const&)=delete;
};

struct func; // 定义在清单2.1中

void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  thread_guard g(t);
  do_something_in_current_thread();
}    // 4

2.1.4 后台运行线程

使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。也就是说,不会等待这个线程结束;如果线程分离,那么就不可能有std::thread对象能引用它,分离线程的确在后台运行,所以分离线程不能被加入。不过C++运行库保证,当线程退出时,相关资源的能够正确回收,后台线程的归属和控制C++运行库都会处理。
通常称分离线程为_守护线程_(daemon threads),UNIX中守护线程是指,没有任何显式的用户接口,并在后台运行的线程。这种线程的特点就是长时间运行;线程的生命周期可能会从某一个应用起始到结束,可能会在后台监视文件系统,还有可能对缓存进行清理,亦或对数据结构进行优化。

std::thread t(do_background_work);
t.detach();
assert(!t.joinable());

清单2.4 使用分离线程去处理其他文档

void edit_document(std::string const& filename)
{
  open_document_and_display_gui(filename);
  while(!done_editing())
  {
    user_command cmd=get_user_input();
    if(cmd.type==open_new_document)
    {
      std::string const new_name=get_filename_from_user();
      std::thread t(edit_document,new_name);  // 1
      t.detach();  // 2
    }
    else
    {
       process_user_input(cmd);
    }
  }
}

2.2 向线程函数传递参数

std::thread构造函数中的可调用对象,或函数传递一个参数很简单。需要注意的是,默认参数要拷贝到线程独立内存中,即使参数是引用的形式,也可以在新线程中进行访问。

void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

代码创建了一个调用f(3, “hello”)的线程。注意,函数f需要一个std::string对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *类型。之后,在线程的上下文中完成字面值向std::string对象的转化。需要特别要注意,当指向动态变量的指针作为参数传递给线程的情况,代码如下:

void f(int i,std::string const& s);
void oops(int some_param)
{
  char buffer[1024]; // 1
  sprintf(buffer, "%i",some_param);
  std::thread t(f,3,buffer); // 2
  t.detach();
}

这种情况下,buffer②是一个指针变量,指向本地变量,然后本地变量通过buffer传递到新线程中②。并且,函数有很有可能会在字面值转化成std::string对象之前崩溃(oops),从而导致一些未定义的行为。并且想要依赖隐式转换将字面值转换为函数期待的std::string对象,但因std::thread的构造函数会复制提供的变量,就只复制了没有转换成期望类型的字符串字面值。

解决方案就是在传递到std::thread构造函数之前就将字面值转化为std::string对象:

void f(int i,std::string const& s);
void not_oops(int some_param)
{
  char buffer[1024];
  sprintf(buffer,"%i",some_param);
  std::thread t(f,3,std::string(buffer));  // 使用std::string,避免悬垂指针
  t.detach();
}

还可能遇到相反的情况:期望传递一个引用,但整个对象被复制了。当线程更新一个引用传递的数据结构时,这种情况就可能发生,比如:

void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
  widget_data data;
  std::thread t(update_data_for_widget,w,data); // 2
  display_status();
  t.join();
  process_widget_data(data); // 3
}

虽然update_data_for_widget①的第二个参数期待传入一个引用,但是std::thread的构造函数②并不知晓;构造函数无视函数期待的参数类型,并盲目的拷贝已提供的变量。当线程调用update_data_for_widget函数时,传递给函数的参数是data变量内部拷贝的引用,而非数据本身的引用。因此,当线程结束时,内部拷贝数据将会在数据更新阶段被销毁,且process_widget_data将会接收到没有修改的data变量③。对于熟悉std::bind的开发者来说,问题的解决办法是显而易见的:可以使用std::ref将参数转换成引用的形式,从而可将线程的调用改为以下形式:

std::thread t(update_data_for_widget,w,std::ref(data));

在这之后,update_data_for_widget就会接收到一个data变量的引用,而非一个data变量拷贝的引用。

如果你熟悉std::bind,就应该不会对以上述传参的形式感到奇怪,因为std::thread构造函数和std::bind的操作都在标准库中定义好了,可以传递一个成员函数指针作为线程函数,并提供一个合适的对象指针作为第一个参数:

class X
{
public:
  void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work,&my_x); // 1

这段代码中,新线程将my_x.do_lengthy_work()作为线程函数;my_x的地址①作为指针对象提供给函数。也可以为成员函数提供参数:std::thread构造函数的第三个参数就是成员函数的第一个参数,以此类推(代码如下,译者自加)。

class X
{
public:
  void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);

有趣的是,提供的参数可以移动,但不能拷贝。"移动"是指:原始对象中的数据转移给另一对象,而转移的这些数据就不再在原始对象中保存了(译者:比较像在文本编辑的"剪切"操作)。std::unique_ptr就是这样一种类型(译者:C++11中的智能指针),这种类型为动态分配的对象提供内存自动管理机制(译者:类似垃圾回收)。同一时间内,只允许一个std::unique_ptr实现指向一个给定对象,并且当这个实现销毁时,指向的对象也将被删除。移动构造函数(move constructor)和移动赋值操作符(move assignment operator)允许一个对象在多个std::unique_ptr实现中传递(有关"移动"的更多内容,请参考附录A的A.1.1节)。使用"移动"转移原对象后,就会留下一个空指针(NULL)。移动操作可以将对象转换成可接受的类型,例如:函数参数或函数返回的类型。当原对象是一个临时变量时,自动进行移动操作,但当原对象是一个命名变量,那么转移的时候就需要使用std::move()进行显示移动。下面的代码展示了std::move的用法,展示了std::move是如何转移一个动态对象到一个线程中去的:

void process_big_object(std::unique_ptr<big_object>);

std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));

std::thread的构造函数中指定std::move(p),big_object对象的所有权就被首先转移到新创建线程的的内部存储中,之后传递给process_big_object函数。

标准线程库中和std::unique_ptr在所属权上有相似语义类型的类有好几种,std::thread为其中之一。虽然,std::thread实例不像std::unique_ptr那样能占有一个动态对象的所有权,但是它能占有其他资源:每个实例都负责管理一个执行线程。执行线程的所有权可以在多个std::thread实例中互相转移,这是依赖于std::thread实例的可移动不可复制性。不可复制保性证了在同一时间点,一个std::thread实例只能关联一个执行线程;可移动性使得程序员可以自己决定,哪个实例拥有实际执行线程的所有权。

2.3 转移线程所有权

假设要写一个在后台启动线程的函数,想通过新线程返回的所有权去调用这个函数,而不是等待线程结束再去调用;或完全与之相反的想法:创建一个线程,并在函数中转移所有权,都必须要等待线程结束。总之,新线程的所有权都需要转移。
这就是移动引入std::thread的原因,C++标准库中有很多_资源占有_(resource-owning)类型,比如std::ifstream,std::unique_ptr还有std::thread都是可移动,但不可拷贝。这就说明执行线程的所有权可以在std::thread实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并且在std::thread实例之间(t1,t2和t3)转移所有权:

void some_function();
void some_other_function();
std::thread t1(some_function);            // 1
std::thread t2=std::move(t1);            // 2
t1=std::thread(some_other_function);    // 3
std::thread t3;                            // 4
t3=std::move(t2);                        // 5
t1=std::move(t3);                        // 6 赋值操作将使程序崩溃

首先,新线程开始与t1相关联。当显式使用std::move()创建t2后②,t1的所有权就转移给了t2。之后,t1和执行线程已经没有关联了;执行some_function的函数现在与t2关联。

然后,与一个临时std::thread对象相关的线程启动了③。为什么不显式调用std::move()转移所有权呢?因为,所有者是一个临时对象——移动操作将会隐式的调用。

t3使用默认构造方式创建④,与任何执行线程都没有关联。调用std::move()将与t2关联线程的所有权转移到t3中⑤。因为t2是一个命名对象,需要显式的调用std::move()。移动操作⑤完成后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。

最后一个移动操作,将some_function线程的所有权转移⑥给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。

std::thread支持移动,就意味着线程的所有权可以在函数外进行转移,就如下面程序一样。

清单2.5 函数返回std::thread对象

std::thread f()
{
  void some_function();
  return std::thread(some_function);
}

std::thread g()
{
  void some_other_function(int);
  std::thread t(some_other_function,42);
  return t;
}

当所有权可以在函数内部传递,就允许std::thread实例可作为参数进行传递,代码如下:

void f(std::thread t);
void g()
{
  void some_function();
  f(std::thread(some_function));
  std::thread t(some_function);
  f(std::move(t));
}

std::thread对象的容器,如果这个容器是移动敏感的(比如,标准中的std::vector<>),那么移动操作同样适用于这些容器。了解这些后,就可以写出类似清单2.7中的代码,代码量产了一些线程,并且等待它们结束。

清单2.7 量产线程,等待它们结束

void do_work(unsigned id);

void f()
{
  std::vector<std::thread> threads;
  for(unsigned i=0; i < 20; ++i)
  {
    threads.push_back(std::thread(do_work,i)); // 产生线程
  } 
  std::for_each(threads.begin(),threads.end(),
  std::mem_fn(&std::thread::join)); // 对每个线程调用join()
}

2.4 运行时决定线程数量

std::thread::hardware_concurrency()在新版C++标准库中是一个很有用的函数。这个函数将返回能同时并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。

2.5 标识线程

线程标识类型是std::thread::id,可以通过两种方式进行检索。第一种,可以通过调用std::thread对象的成员函数get_id()来直接获取。如果std::thread对象没有与任何执行线程相关联,get_id()将返回std::thread::type默认构造值,这个值表示“无线程”。第二种,当前线程中调用std::this_thread::get_id()(这个函数定义在<thread>头文件中)也可以获得线程标识。

std::thread::id对象可以自由的拷贝和对比,因为标识符就可以复用。如果两个对象的std::thread::id相等,那它们就是同一个线程,或者都“无线程”。如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有线程。

线程库不会限制你去检查线程标识是否一样,std::thread::id类型对象提供相当丰富的对比操作;比如,提供为不同的值进行排序。这意味着允许程序员将其当做为容器的键值,做排序,或做其他方式的比较。按默认顺序比较不同值的std::thread::id,所以这个行为可预见的:当a<bb<c时,得a<c,等等。标准库也提供std::hash<std::thread::id>容器,所以std::thread::id也可以作为无序容器的键值。

std::thread::id master_thread;
void some_core_part_of_algorithm()
{
  if(std::this_thread::get_id()==master_thread)
  {
    do_master_thread_work();
  }
  do_common_work();
}

3 线程间共享数据

3.1 共享数据带来的问题

当涉及到共享数据时,问题很可能是因为共享数据修改所导致。如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须小心谨慎,才能确保一切所有线程都工作正常。
不变量(invariants)的概念对程序员们编写的程序会有一定的帮助——对于特殊结构体的描述;比如,“变量包含列表中的项数”。不变量通常会在一次更新中被破坏,特别是比较复杂的数据结构,或者一次更新就要改动很大的数据结构。
双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。其中不变量就是节点A中指向“下一个”节点B的指针,还有前向指针。为了从列表中删除一个节点,其两边节点的指针都需要更新。当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。

从一个列表中删除一个节点的步骤如下图:

1. 找到要删除的节点N
2. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点
3. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点
4. 删除节点N

请添加图片描述
图中b和c在相同的方向上指向和原来已经不一致了,这就破坏了不变量。线程间潜在问题就是修改共享数据,致使不变量遭到破坏。这就是并行代码常见错误:条件竞争。

3.1.1 条件竞争

C++标准中也定义了数据竞争这个术语,一种特殊的条件竞争:并发的去修改一个独立对象,数据竞争是(可怕的)未定义行为的起因。
恶性条件竞争通常发生于完成对多于一个的数据块的修改时,例如,对两个连接指针的修改。因为操作要访问两个独立的数据块,独立的指令将会对数据块将进行修改,并且其中一个线程可能正在进行时,另一个线程就对数据块进行了访问。因为出现的概率太低,条件竞争很难查找,也很难复现。

3.1.2 避免恶性条件竞争

这里提供一些方法来解决恶性条件竞争,最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。C++标准库提供很多类似的机制。
另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。不过,这种方式很难得到正确的结果。如果到这个级别,无论是内存模型上的细微差异,还是线程访问数据的能力,都会让工作变的复杂。
另一种处理条件竞争的方式是,使用事务的方式去处理数据结构的更新(这里的"处理"就如同对数据库进行更新一样)。所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”。理论研究中,这是一个很热门的研究领域。这个概念将不会在本书中再进行介绍,因为在C++中没有对STM进行直接支持。但是,基本思想会在后面提及。

保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量。

3.2 使用互斥量保护共享数据

互斥量是C++中一种最通用的数据保护机制,但它不是“银弹”;精心组织代码来保护正确的数据,并在接口内部避免竞争条件是非常重要的。但互斥量自身也有问题,也会造成死锁,或是对数据保护的太多(或太少)。

3.2.1 C++中使用互斥量

C++中通过实例化std::mutex创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。不过,不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。下面的程序清单中,展示了如何在多线程程序中,使用std::mutex构造的std::lock_guard实例,对一个列表进行访问保护。std::mutexstd::lock_guard都在<mutex>头文件中声明。
清单3.1 使用互斥量保护列表

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;    // 1
std::mutex some_mutex;    // 2

void add_to_list(int new_value)
{
  std::lock_guard<std::mutex> guard(some_mutex);    // 3
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
  std::lock_guard<std::mutex> guard(some_mutex);    // 4
  return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。
当然,也不是总是那么理想,聪明的你一定注意到了:当其中一个成员函数返回的是保护数据的指针或引用时,会破坏对数据的保护。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。互斥量保护的数据需要对接口的设计相当谨慎,要确保互斥量能锁住任何对保护数据的访问,并且不留后门。

3.2.2 精心组织代码来保护共享数据

使用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个std::lock_guard对象那么简单;一个迷失的指针或引用,将会让这种保护形同虚设。
清单3.2 无意中传递了保护数据的引用

class some_data
{
  int a;
  std::string b;
public:
  void do_something();
};

class data_wrapper
{
private:
  some_data data;
  std::mutex m;
public:
  template<typename Function>
  void process_data(Function func)
  {
    std::lock_guard<std::mutex> l(m);
    func(data);    // 1 传递“保护”数据给用户函数
  }
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
  unprotected=&protected_data;
}

data_wrapper x;
void foo()
{
  x.process_data(malicious_function);    // 2 传递一个恶意函数
  unprotected->do_something();    // 3 在无保护的情况下访问保护数据
}

例子中process_data看起来没有任何问题,std::lock_guard对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function传递进去②,在没有锁定互斥量的情况下调用do_something()

3.2.3 发现接口内在的条件竞争

清单3.3 std::stack容器的实现

template<typename T,typename Container=std::deque<T> >
class stack
{
public:
  explicit stack(const Container&);
  explicit stack(Container&& = Container());
  template <class Alloc> explicit stack(const Alloc&);
  template <class Alloc> stack(const Container&, const Alloc&);
  template <class Alloc> stack(Container&&, const Alloc&);
  template <class Alloc> stack(stack&&, const Alloc&);
  
  bool empty() const;
  size_t size() const;
  T& top();
  T const& top() const;
  void push(T const&);
  void push(T&&);
  void pop();
  void swap(stack&&);
};

虽然empty()和size()可能在被调用并返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。

特别地,当栈实例是非共享的,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:

stack<int> s;
if (! s.empty()){    // 1
  int const value = s.top();    // 2
  s.pop();    // 3
  do_something(value);
}

以上是单线程安全代码:对一个空栈使用top()是未定义行为。对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
怎么解决呢?问题发生在接口设计上,所以解决的方法也就是改变接口设计。
表3.1 一种可能执行顺序

Thread AThread B
if (!s.empty);
if(!s.empty);
int const value = s.top();
int const value = s.top();
s.pop();
do_something(value);s.pop();
do_something(value);

当线程运行时,调用两次top(),栈没被修改,所以每个线程能得到同样的值。值已被处理了两次。这种条件竞争,因为看起来没有任何错误,就会让这个Bug很难定位。
这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()和pop()。
选项1: 传入一个引用
第一个选项是将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:

std::vector<int> result;
some_stack.pop(result);

大多数情况下,这种方式还不错,但有明显的缺点:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看,都是不划算。对于其他的类型,这样也不总能行得通,因为构造函数需要的一些参数,在代码的这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。一个有用的选项可以限制对线程安全的栈的使用,并且能让栈安全的返回所需的值,而不会抛出异常。
选项3:返回指向弹出值的指针
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。缺点就是返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于选择这个方案的接口,使用std::shared_ptr是个不错的选择;不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当你已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。
例:定义线程安全的堆栈
清单3.4 线程安全的堆栈类定义(概述)

#include <exception>
#include <memory>  // For std::shared_ptr<>

struct empty_stack: std::exception
{
  const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
public:
  threadsafe_stack();
  threadsafe_stack(const threadsafe_stack&);
  threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除

  void push(T new_value);
  std::shared_ptr<T> pop();
  void pop(T& value);
  bool empty() const;
};

削减接口可以获得最大程度的安全,甚至限制对栈的一些操作。使用std::shared_ptr可以避免内存分配管理的问题,并避免多次使用new和delete操作。
清单3.5 扩充(线程安全)堆栈

#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception
{
  const char* what() const throw() {
	return "empty stack!";
  };
};

template<typename T>
class threadsafe_stack
{
private:
  std::stack<T> data;
  mutable std::mutex m;
  
public:
  threadsafe_stack()
	: data(std::stack<T>()){}
  
  threadsafe_stack(const threadsafe_stack& other)
  {
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; // 1 在构造函数体中的执行拷贝
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  }
  
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空
	
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
    data.pop();
    return res;
  }
  
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();
	
    value=data.top();
    data.pop();
  }
  
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
};

之前对top()和pop()函数的讨论中,恶性条件竞争已经出现,因为锁的粒度太小,需要保护的操作并未全覆盖到。不过,锁住的颗粒过大同样会有问题。还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。在第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。虽然这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。

3.2.4 死锁:问题描述及解决方案

试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。例如,一个玩具鼓,需要一个鼓锤和一个鼓才能玩。现在有两个小孩,他们都很喜欢玩这个玩具。当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。当另一孩子想要玩,他就得等待另一孩子玩完才行。再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。之后,他们就去玩具箱里面找这个鼓。其中一个找到了鼓,并且另外一个找到了鼓锤。现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子;但当他们都紧握着自己所有的部分而不给予,那么这个鼓谁都没法玩。
现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。
std::lock——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序清单中,就来看一下怎么在一个简单的交换操作中使用std::lock
清单3.6 交换操作中使用std::lock()std::lock_guard

// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;
    std::lock(lhs.m,rhs.m); // 1
    std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
    std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
    swap(lhs.some_detail,rhs.some_detail);
  }
};

使用std::lock去锁lhs.m或rhs.m时,可能会抛出异常;这种情况下,异常会传播到std::lock之外。当std::lock成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以std::lock要么将两个锁都锁住,要不一个都不锁。
虽然std::lock可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。

3.2.5 避免死锁的进阶指导

虽然锁是产生死锁的一般原因,但也不排除死锁出现在其他地方。无锁的情况下,仅需要每个std::thread对象调用join(),两个线程就能产生死锁。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人。以下提供一些个人的指导建议,如何识别死锁,并消除其他线程的等待。
避免嵌套锁
第一个建议往往是最简单的:一个线程已获得一个锁时,再别去获取第二个。如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。当你需要获取多个锁,使用一个std::lock来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用用户提供的代码
:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)。
使用固定顺序获取锁
当硬性条件要求你获取两个以上(包括两个)的锁,并且不能使用std::lock单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们获取它们(锁)。
为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。一旦下一个节点上的锁被获取,那么第一个节点的锁就可以释放了,因为没有持有它的必要性了。
这种“手递手”锁的模式允许多个线程访问列表,为每一个访问的线程提供不同的节点。但是,为了避免死锁,节点必须以同样的顺序上锁:如果两个线程试图用互为反向的顺序,使用“手递手”锁遍历列表,他们将执行到列表中间部分时,发生死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并且试图获取节点A上的锁——经典的死锁场景。
当A、C节点中的B节点正在被删除时,如果有线程在已获取A和C上的锁后,还要获取B节点上的锁时,当一个线程遍历列表的时候,这样的情况就可能发生死锁。这样的线程可能会试图首先锁住A节点或C节点(根据遍历的方向),但是后面就会发现,它无法获得B上的锁,因为线程在执行删除任务的时候,已经获取了B上的锁,并且同时也获取了A和C上的锁。

这里提供一种避免死锁的方式,定义遍历的顺序,所以一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁发生的可能性,在不允许反向遍历的列表上。类似的约定常被用来建立其他的数据结构。
使用锁的层次结构
清单3.7 使用层次锁来避免死锁

hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000);  // 2

int do_low_level_stuff();

int low_level_func()
{
  std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 3
  return do_low_level_stuff();
}

void high_level_stuff(int some_param);

void high_level_func()
{
  std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 4
  high_level_stuff(low_level_func()); // 5
}

void thread_a()  // 6
{
  high_level_func();
}

hierarchical_mutex other_mutex(100); // 7
void do_other_stuff();

void other_stuff()
{
  high_level_func();  // 8
  do_other_stuff();
}

void thread_b() // 9
{
  std::lock_guard<hierarchical_mutex> lk(other_mutex); // 10
  other_stuff();
}

例子也展示了另一点,std::lock_guard<>模板与用户定义的互斥量类型一起使用。虽然hierarchical_mutex不是C++标准的一部分,但是它写起来很容易;一个简单的实现在列表3.8中展示出来。尽管它是一个用户定义类型,它可以用于std::lock_guard<>模板中,因为它的实现有三个成员函数为了满足互斥量操作:lock(), unlock() 和 try_lock()。虽然你还没见过try_lock()怎么使用,但是其使用起来很简单:当互斥量上的锁被一个线程持有,它将返回false,而不是等待调用的线程,直到能够获取互斥量上的锁为止。在std::lock()的内部实现中,try_lock()会作为避免死锁算法的一部分。
列表3.8 简单的层级互斥量实现

class hierarchical_mutex
{
  std::mutex internal_mutex;
  
  unsigned long const hierarchy_value;
  unsigned long previous_hierarchy_value;
  
  static thread_local unsigned long this_thread_hierarchy_value;  // 1
  
  void check_for_hierarchy_violation()
  {
    if(this_thread_hierarchy_value <= hierarchy_value)  // 2
    {
      throw std::logic_error(“mutex hierarchy violated”);
    }
  }
  
  void update_hierarchy_value()
  {
    previous_hierarchy_value=this_thread_hierarchy_value;  // 3
    this_thread_hierarchy_value=hierarchy_value;
  }
  
public:
  explicit hierarchical_mutex(unsigned long value):
      hierarchy_value(value),
      previous_hierarchy_value(0)
  {}
  
  void lock()
  {
    check_for_hierarchy_violation();
    internal_mutex.lock();  // 4
    update_hierarchy_value();  // 5
  }
  
  void unlock()
  {
    this_thread_hierarchy_value=previous_hierarchy_value;  // 6
    internal_mutex.unlock();
  }
  
  bool try_lock()
  {
    check_for_hierarchy_violation();
    if(!internal_mutex.try_lock())  // 7
      return false;
    update_hierarchy_value();
    return true;
  }
};
thread_local unsigned long
     hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);  // 8

超越锁的延伸扩展
当代码已经能规避死锁,std::lock()std::lock_guard能组成简单的锁覆盖大多数情况,但是有时需要更多的灵活性。在这些情况,可以使用标准库提供的std::unique_lock模板。如std::lock_guard,这是一个参数化的互斥量模板类,并且它提供很多RAII类型锁用来管理std::lock_guard类型,可以让代码更加灵活。

3.2.6 std::unique_lock——灵活的锁

std::unqiue_lock使用更为自由的不变量,这样std::unique_lock实例不会总与互斥量的数据类型相关,使用起来要比std:lock_guard更加灵活。首先,可将std::adopt_lock作为第二个参数传入构造函数,对互斥量进行管理;也可以将std::defer_lock作为第二个参数传递进去,表明互斥量应保持解锁状态。这样,就可以被std::unique_lock对象(不是互斥量)的lock()函数的所获取,或传递std::unique_lock对象到std::lock()中。清单3.6可以轻易的转换为清单3.9,使用std::unique_lockstd::defer_lock①,而非std::lock_guardstd::adopt_lock。代码长度相同,几乎等价,唯一不同的就是:std::unique_lock会占用比较多的空间,并且比std::lock_guard稍慢一些。保证灵活性要付出代价,这个代价就是允许std::unique_lock实例不带互斥量:信息已被存储,且已被更新。
清单3.9 交换操作中std::lock()std::unique_lock的使用

class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}
  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;
    std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); // 1 
    std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); // 1 std::def_lock 留下未上锁的互斥量
    std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
    swap(lhs.some_detail,rhs.some_detail);
  }
};

列表3.9中,因为std::unique_lock支持lock(), try_lock()和unlock()成员函数,所以能将std::unique_lock对象传递到std::lock()②。这些同名的成员函数在低层做着实际的工作,并且仅更新std::unique_lock实例中的标志,来确定该实例是否拥有特定的互斥量,这个标志是为了确保unlock()在析构函数中被正确调用。如果实例拥有互斥量,那么析构函数必须调用unlock();但当实例中没有互斥量时,析构函数就不能去调用unlock()。这个标志可以通过owns_lock()成员变量进行查询。

可能如你期望的那样,这个标志被存储在某个地方。因此,std::unique_lock对象的体积通常要比std::lock_guard对象大,当使用std::unique_lock替代std::lock_guard,因为会对标志进行适当的更新或检查,就会做些轻微的性能惩罚。当std::lock_guard已经能够满足你的需求,那么还是建议你继续使用它。当需要更加灵活的锁时,最好选择std::unique_lock,因为它更适合于你的任务。你已经看到一个递延锁的例子,另外一种情况是锁的所有权需要从一个域转到另一个域。

3.2.7 不同域中互斥量所有权的传递

std::unique_lock是可移动,但不可赋值的类型。

std::unique_lock<std::mutex> get_lock()
{
  extern std::mutex some_mutex;
  std::unique_lock<std::mutex> lk(some_mutex);
  prepare_data();
  return lk;  // 1
}
void process_data()
{
  std::unique_lock<std::mutex> lk(get_lock());  // 2
  do_something();
}

std::unique_lock的灵活性同样也允许实例在销毁之前放弃其拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock的成员函数提供类似于锁定和解锁互斥量的功能。std::unique_lock实例在销毁前释放锁的能力,当锁没有必要在持有的时候,可以在特定的代码分支对其进行选择性的释放。这对于应用性能来说很重要,因为持有锁的时间增加会导致性能下降,其他线程会等待这个锁的释放,避免超越操作。

3.2.8 锁的粒度

选择粒度对于锁来说很重要,为了保护对应的数据,保证锁有能力保护这些数据也很重要。
如果很多线程正在等待同一个资源,当有线程持有锁的时间过长,这就会增加等待的时间。在可能的情况下,锁住互斥量的同时只能对共享数据进行访问;试图对锁外数据进行处理。特别是做一些费时的动作,比如:对文件的输入/输出操作进行上锁。文件输入/输出通常要比从内存中读或写同样长度的数据慢成百上千倍,所以除非锁已经打算去保护对文件的访问,要么执行输入/输出操作将会将延迟其他线程执行的时间,这很没有必要(因为文件锁阻塞住了很多操作),这样多线程带来的性能效益会被抵消。

这能表示只有一个互斥量保护整个数据结构时的情况,不仅可能会有更多对锁的竞争,也会增加锁持锁的时间。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。
列表3.10 比较操作符中一次锁住一个互斥量

class Y
{
private:
  int some_detail;
  mutable std::mutex m;
  int get_detail() const
  {
    std::lock_guard<std::mutex> lock_a(m);  // 1
    return some_detail;
  }
public:
  Y(int sd):some_detail(sd){}

  friend bool operator==(Y const& lhs, Y const& rhs)
  {
    if(&lhs==&rhs)
      return true;
    int const lhs_value=lhs.get_detail();  // 2
    int const rhs_value=rhs.get_detail();  // 3
    return lhs_value==rhs_value;  // 4
  }
};

3.3 保护共享数据的替代设施

互斥量是最通用的机制,但其并非保护共享数据的唯一方式。这里有很多替代方式可以在特定情况下,提供更加合适的保护。
一个特别极端(但十分常见)的情况就是,共享数据在并发访问和初始化时(都需要保护),但是之后需要进行隐式同步。这可能是因为数据作为只读方式创建,所以没有同步问题;或者因为必要的保护作为对数据操作的一部分,所以隐式的执行。任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程(这是没有必要的),并且这会给性能带来不必要的冲击。出于以上的原因,C++标准提供了一种纯粹保护共享数据初始化过程的机制。

3.3.1 保护共享数据的初始化过程

假设你与一个共享源,构建代价很昂贵,可能它会打开一个数据库连接或分配出很多的内存。
延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:

std::shared_ptr<some_resource> resource_ptr;
void foo()
{
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 1
  }
  resource_ptr->do_something();
}

清单 3.11 使用一个互斥量的延迟初始化(线程安全)过程

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo()
{
  std::unique_lock<std::mutex> lk(resource_mutex);  // 所有线程在此序列化 
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 只有初始化过程需要保护 
  }
  lk.unlock();
  resource_ptr->do_something();
}

这段代码相当常见了,也足够表现出没必要的线程化问题,很多人能想出更好的一些的办法来做这件事,包括声名狼藉的双重检查锁模式:

void undefined_behaviour_with_double_checked_locking()
{
  if(!resource_ptr)  // 1
  {
    std::lock_guard<std::mutex> lk(resource_mutex);
    if(!resource_ptr)  // 2
    {
      resource_ptr.reset(new some_resource);  // 3
    }
  }
  resource_ptr->do_something();  // 4
}

指针第一次读取数据不需要获取锁①,并且只有在指针为NULL时才需要获取锁。然后,当获取锁之后,指针会被再次检查一遍② (这就是双重检查的部分),避免另一的线程在第一次检查后再做初始化,并且让当前线程获取锁。
这个模式为什么声名狼藉呢?因为这里有潜在的条件竞争,未被锁保护的读取操作①没有与其他线程里被锁保护的写入操作③进行同步。因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的some_resource实例,然后调用do_something()④后,得到不正确的结果。这个例子是在一种典型的条件竞争——数据竞争,C++标准中这就会被指定为“未定义行为”。这种竞争肯定是可以避免的。可以阅读第5章,那里有更多对内存模型的讨论,包括数据竞争的构成。
C++标准委员会也认为条件竞争的处理很重要,所以C++标准库提供了std::once_flagstd::call_once来处理这种情况。比起锁住互斥量,并显式的检查指针,每个线程只需要使用std::call_once,在std::call_once的结束时,就能安全的知道指针已经被其他的线程初始化了。使用std::call_once比显式使用互斥量消耗的资源更少,特别是当初始化完成后。

std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;  // 1

void init_resource()
{
  resource_ptr.reset(new some_resource);
}

void foo()
{
  std::call_once(resource_flag,init_resource);  // 可以完整的进行一次初始化
  resource_ptr->do_something();
}

清单3.12 使用std::call_once作为类成员的延迟初始化(线程安全)

class X
{
private:
  connection_info connection_details;
  connection_handle connection;
  std::once_flag connection_init_flag;

  void open_connection()
  {
    connection=connection_manager.open(connection_details);
  }
public:
  X(connection_info const& connection_details_):
      connection_details(connection_details_)
  {}
  void send_data(data_packet const& data)  // 1
  {
    std::call_once(connection_init_flag,&X::open_connection,this);  // 2
    connection.send_data(data);
  }
  data_packet receive_data()  // 3
  {
    std::call_once(connection_init_flag,&X::open_connection,this);  // 2
    return connection.receive_data();
  }
};

值得注意的是,std::mutexstd::one_flag的实例就不能拷贝和移动,所以当你使用它们作为类成员函数,如果你需要用到他们,你就得显示定义这些特殊的成员函数。

3.3.2 保护很少更新的数据结构

试想,为了将域名解析为其相关IP地址,我们在缓存中的存放了一张DNS入口表。通常,给定DNS数目在很长的一段时间内保持不变。虽然,在用户访问不同网站时,新的入口可能会被添加到表中,但是这些数据可能在其生命周期内保持不变。所以定期检查缓存中入口的有效性,就变的十分重要了;但是,这也需要一次更新,也许这次更新只是对一些细节做了改动。
使用std::mutex来保护数据结构,显的有些反应过度,比起使用std::mutex实例进行同步,不如使用boost::shared_mutex来做同步。
清单3.13 使用boost::shared_mutex对数据结构进行保护

#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>

class dns_entry;

class dns_cache
{
  std::map<std::string,dns_entry> entries;
  mutable boost::shared_mutex entry_mutex;
public:
  dns_entry find_entry(std::string const& domain) const
  {
    boost::shared_lock<boost::shared_mutex> lk(entry_mutex);  // 1
    std::map<std::string,dns_entry>::const_iterator const it=
       entries.find(domain);
    return (it==entries.end())?dns_entry():it->second;
  }
  void update_or_add_entry(std::string const& domain,
                           dns_entry const& dns_details)
  {
    std::lock_guard<boost::shared_mutex> lk(entry_mutex);  // 2
    entries[domain]=dns_details;
  }
};

3.3.3 嵌套锁

当一个线程已经获取一个std::mutex时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。之所以可以,是因为C++标准库提供了std::recursive_mutex类。互斥量锁住其他线程前,你必须释放你拥有的所有锁,所以当你调用lock()三次时,你也必须调用unlock()三次。正确使用std::lock_guard<std::recursive_mutex>std::unique_lock<std::recursive_mutex>可以帮你处理这些问题。

4 同步并发操作

4.1 等待一个事件或其他条件

晚间在列车上等待凌晨到站出站,要么一直不睡等着,要么等待被列车员唤醒。
第二个选择是在等待线程在检查间隙,使用std::this_thread::sleep_for()进行周期性的间歇(详见4.3节):

bool flag;
std::mutex m;

void wait_for_flag()
{
  std::unique_lock<std::mutex> lk(m);
  while(!flag)
  {
    lk.unlock();  // 1 解锁互斥量
    std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 2 休眠100ms
    lk.lock();   // 3 再锁互斥量
  }
}

这里博主理解为是一个轮询。

4.1.1 等待条件达成

清单4.1 使用std::condition_variable处理数据等待

std::mutex mut;
std::queue<data_chunk> data_queue;  // 1
std::condition_variable data_cond;

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(data);  // 2
    data_cond.notify_one();  // 3
  }
}

void data_processing_thread()
{
  while(true)
  {
    std::unique_lock<std::mutex> lk(mut);  // 4
    data_cond.wait(
         lk,[]{return !data_queue.empty();});  // 5
    data_chunk data=data_queue.front();
    data_queue.pop();
    lk.unlock();  // 6
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

4.1.2 使用条件变量构建线程安全队列

清单4.2 std::queue接口

template <class T, class Container = std::deque<T> >
class queue {
public:
  explicit queue(const Container&);
  explicit queue(Container&& = Container());
  template <class Alloc> explicit queue(const Alloc&);
  template <class Alloc> queue(const Container&, const Alloc&);
  template <class Alloc> queue(Container&&, const Alloc&);
  template <class Alloc> queue(queue&&, const Alloc&);

  void swap(queue& q);

  bool empty() const;
  size_type size() const;

  T& front();
  const T& front() const;
  T& back();
  const T& back() const;

  void push(const T& x);
  void push(T&& x);
  void pop();
  template <class... Args> void emplace(Args&&... args);
};

当你忽略构造、赋值以及交换操作时,你就剩下了三组操作:1. 对整个队列的状态进行查询(empty()和size());2.查询在队列中的各个元素(front()和back());3.修改队列的操作(push(), pop()和emplace())。这就和3.2.3中的栈一样了,因此你也会遇到在固有接口上的条件竞争。因此,你需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。与清单4.1中的代码不同的是:当使用队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里我们提供pop()函数的两个变种:try_pop()和wait_and_pop()。try_pop() ,尝试从队列中弹出数据,总会直接返回(当有失败时),即使没有值可检索;wait_and_pop(),将会等待有值可检索的时候才返回。当你使用之前栈的方式来实现你的队列,你实现的队列接口就可能会是下面这样:
清单4.3 线程安全队列的接口

#include <memory> // 为了使用std::shared_ptr

template<typename T>
class threadsafe_queue
{
public:
  threadsafe_queue();
  threadsafe_queue(const threadsafe_queue&);
  threadsafe_queue& operator=(
      const threadsafe_queue&) = delete;  // 不允许简单的赋值

  void push(T new_value);

  bool try_pop(T& value);  // 1
  std::shared_ptr<T> try_pop();  // 2

  void wait_and_pop(T& value);
  std::shared_ptr<T> wait_and_pop();

  bool empty() const;
};

清单4.4 从清单4.1中提取push()和wait_and_pop()

#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }
};
threadsafe_queue<data_chunk> data_queue;  // 1

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    data_queue.push(data);  // 2
  }
}

void data_processing_thread()
{
  while(true)
  {
    data_chunk data;
    data_queue.wait_and_pop(data);  // 3
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

清单4.5 使用条件变量的线程安全队列(完整版)

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;  // 1 互斥量必须是可变的 
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue()
  {}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue=other.data_queue;
  }

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

4.2 使用期望等待一次性事件

5 C++内存模型和原子类型操作

6 基于锁的并发数据结构设计

7 无锁并发数据结构设计

8 并发代码设计

9 高级线程管理

10 多线程程序的测试和调试

重点

线程std::thread对象必须调用join或detach

Destroys the thread object. If *this has an associated thread (joinable() == true), std::terminate() is called.
创建的std::thread对象会在构造函数结束时销毁,因为它是一个局部变量。如果调用了std::thread的析构函数,而该线程是joinable,则调用std::terminate。即程序会调用Abort(),不进行任何清理工作,然后abnormal program termination(程序异常终止)。

shared_mutex也是基于操作系统底层的读写锁pthread_rwlock_t的封装

参考

1、《C++ Concurrency in Action》[陈晓伟 译]
2、github 翻译地址
3、gitbook 在线阅读
4、书中源码
5、学习C++11/14
6、CPP-Concurrency-In-Action-2ed
7、thread析构
8、C++11多线程 - Part 2: 连接和分离线程
9、Starting thread causing abort()
10、Boost Mutex详细解说

标签:std,thread,lock,void,C++,线程,Concurrency,Action,data
来源: https://blog.csdn.net/qq_38880380/article/details/122679267

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有