ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

具有C 11多线程的特征库

2019-10-06 06:15:46  阅读:164  来源: 互联网

标签:c c11 multithreading eigen parallel-processing


我有一个代码来计算具有期望最大化的高斯混合模型,以便从给定的输入数据样本中识别聚类.

一段代码重复计算这样的模型,用于许多试验Ntrial(一个独立的,但使用相同的输入数据),以便最终获得最佳解决方案(从模型中最大化总可能性的那个) ).该概念可以推广到许多其他聚类算法(例如k均值).

我希望通过C 11的多线程并行化必须重复Ntrial次数的代码部分,以便每个线程执行一次试验.

一个代码示例,假设(Ndimensions x Npoints)的输入Eigen :: ArrayXXd样本可以是以下类型:

    double bestTotalModelProbability = 0;
    Eigen::ArrayXd clusterIndicesFromSample(Npoints);
    clusterIndicesFromSample.setZero();

    for (int i=0; i < Ntrials; i++)
    {
         totalModelProbability = computeGaussianMixtureModel(sample);


         // Check if this trial is better than the previous one.
         // If so, update the results (cluster index for each point
         // in the sample) and keep them.

         if totalModelProbability > bestTotalModelProbability
         {
             bestTotalModelProbability = totalModelProbability;
             ...
             clusterIndicesFromSample = obtainClusterMembership(sample);
         }
    }

其中我传递样本的参考值(Eigen :: Ref),而不是将其自身采样到函数computeGaussianMixtureModel()和obtainClusterMembership().

我的代码很大程度上基于Eigen数组,我采用的N维问题可以解释10-100维度和500-1000个不同的样本点.我正在寻找一些例子,使用Eigen数组和std:C 11的线程创建这个代码的多线程版本,但是找不到任何东西,我正在努力制作一些简单的例子来操作Eigen数组.

我甚至不确定Eigen可以在C 11中的std :: thread中使用.
有人可以用一些简单的例子来帮助我理解synthax吗?
我在具有6个内核(12个线程)的CPU上使用clang作为Mac OSX中的编译器.

解决方法:

OP的问题引起了我的注意,因为通过多线程获得的加速数字运算是我个人名单上的顶级待办事项之一.

我必须承认,我对Eigen库的经验非常有限. (我曾经将3×3旋转矩阵的分解用于欧拉角,这在特征库中以一般方式非常巧妙地解决.)

因此,我定义了另一个样本任务,包括对样本数据集中的值进行愚蠢的计数.

这是多次完成的(使用相同的评估函数):

>单线程(获取比较值)
>在一个额外的线程中开始每个子任务(以一种公认的相当愚蠢的方式)
>启动线程,交叉访问样本数据
>启动具有分区访问样本数据的线程.

test-multi-threading.cc:

#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>

// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
  size_t n, const T data[], const T &begin, const T &end)
{
  size_t result = 0;
  for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
  return result;
}

typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;

Time duration(const Clock::time_point &t0)
{
  return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}

std::vector<Time> makeTest()
{
  const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
  const size_t NThreads = std::thread::hardware_concurrency();
  // make a test sample
  std::vector<Value> sample(N);
  for (Value &value : sample) value = (Value)rand();
  // prepare result vectors
  std::vector<size_t> results4[4] = {
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0)
  };
  // make test
  std::vector<Time> times{
    [&]() { // single threading
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[0];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment single-threaded
      for (size_t i = 0; i < NGroups; ++i) {
        results[i] = countFrequency(data.size(), data.data(),
          (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - stupid aproach
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[1];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value i = 0; i < NGroups;) {
        size_t nT = 0;
        for (; nT < NThreads && i < NGroups; ++nT, ++i) {
          threads[nT] = std::move(std::thread(
            [i, &results, &data, SizeGroup]() {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }));
        }
        for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - interleaved
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[2];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT; i < NGroups; i += NThreads) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - grouped
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[3];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT * NGroups / NThreads,
              iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }()
  };
  // check results (must be equal for any kind of computation)
  const unsigned nResults = sizeof results4 / sizeof *results4;
  for (unsigned i = 1; i < nResults; ++i) {
    size_t nErrors = 0;
    for (Value j = 0; j < NGroups; ++j) {
      if (results4[0][j] != results4[i][j]) {
        ++nErrors;
#ifdef _DEBUG
        std::cerr
          << "results4[0][" << j << "]: " << results4[0][j]
          << " != results4[" << i << "][" << j << "]: " << results4[i][j]
          << "!\n";
#endif // _DEBUG
      }
    }
    if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!\n";
  }
  // done
  return times;
}

int main()
{
  std::cout << "std::thread::hardware_concurrency(): "
    << std::thread::hardware_concurrency() << '\n';
  // heat up
  std::cout << "Heat up...\n";
  for (unsigned i = 0; i < 3; ++i) makeTest();
  // repeat NTrials times
  const unsigned NTrials = 10;
  std::cout << "Measuring " << NTrials << " runs...\n"
    << "   "
    << " | " << std::setw(10) << "Single"
    << " | " << std::setw(10) << "Multi 1"
    << " | " << std::setw(10) << "Multi 2"
    << " | " << std::setw(10) << "Multi 3"
    << '\n';
  std::vector<double> sumTimes;
  for (unsigned i = 0; i < NTrials; ++i) {
    std::vector<Time> times = makeTest();
    std::cout << std::setw(2) << (i + 1) << ".";
    for (const Time &time : times) {
      std::cout << " | " << std::setw(10) << time.count();
    }
    std::cout << '\n';
    sumTimes.resize(times.size(), 0.0);
    for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
  }
  std::cout << "Average Values:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(1)
      << sumTime / NTrials;
  }
  std::cout << '\n';
  std::cout << "Ratio:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(3)
      << sumTime / sumTimes.front();
  }
  std::cout << '\n';
  // done
  return 0;
}

在Windows 10上的cygwin64上编译和测试:

$g++ --version
g++ (GCC) 7.3.0

$g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc

$./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     384008 |    1052937 |     130662 |     138411
 2. |     386500 |    1103281 |     133030 |     132576
 3. |     382968 |    1078988 |     137123 |     137780
 4. |     395158 |    1120752 |     138731 |     138650
 5. |     385870 |    1105885 |     144825 |     129405
 6. |     366724 |    1071788 |     137684 |     130289
 7. |     352204 |    1104191 |     133675 |     130505
 8. |     331679 |    1072299 |     135476 |     138257
 9. |     373416 |    1053881 |     138467 |     137613
10. |     370872 |    1096424 |     136810 |     147960
Average Values:
    |   372939.9 |  1086042.6 |   136648.3 |   136144.6
Ratio:
    |      1.000 |      2.912 |      0.366 |      0.365

我在coliru.com上做了同样的事情. (当我超过原始值的时间限制时,我不得不减少加热周期和样本量.):

g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     224684 |     297729 |      48334 |      39016
 2. |     146232 |     337222 |      66308 |      59994
 3. |     195750 |     344056 |      61383 |      63172
 4. |     198629 |     317719 |      62695 |      50413
 5. |     149125 |     356471 |      61447 |      57487
 6. |     155355 |     322185 |      50254 |      35214
 7. |     140269 |     316224 |      61482 |      53889
 8. |     154454 |     334814 |      58382 |      53796
 9. |     177426 |     340723 |      62195 |      54352
10. |     151951 |     331772 |      61802 |      46727
Average Values:
    |   169387.5 |   329891.5 |    59428.2 |    51406.0
Ratio:
    |      1.000 |      1.948 |      0.351 |      0.303

Live Demo on coliru

我想知道coliru(只有4个线程)的比率甚至比我的PC(有8个线程)更好.实际上,我不知道如何解释这一点.
但是,在这两种设置中存在许多其他差异,这些差异可能是也可能不是.至少,对于第3和第4次进近,两次测量都显示3的粗略加速,其中第2次消耗唯一的每个潜在加速(可能通过启动和连接所有这些线程).

查看示例代码,您将发现没有互斥锁或任何其他显式锁定.这是故意的.正如我已经了解的(许多年前),每次并行化尝试都可能导致额外的通信开销(对于必须交换数据的并发任务).如果通信开销变得很大,它就会消耗并发的速度优势.因此,可以通过以下方式实现最佳加速:

>最小通信开销,即并发任务对独立数据进行操作
>对并发计算结果进行后合并的最小努力.

在我的示例代码中,我

>在启动线程之前准备好每个数据和存储,
>线程运行时,永远不会更改读取的共享数据,
>以线程本地写入的数据(没有两个线程写入相同的数据地址)
>在连接所有线程后评估计算结果.

关于3.我有点挣扎这是否合法,即它是否被授予在线程中写入的数据,以便在加入后在主线程中正确显示. (事情似乎工作得很好但总的来说是虚幻的,但在多线程方面尤其虚幻.)

cppreference.com提供以下解释

> std::thread::thread()

The completion of the invocation of the constructor synchronizes-with (as defined in 07003) the beginning of the invocation of the copy of f on the new thread of execution.

> std::thread::join()

The completion of the thread identified by *this synchronizes with the corresponding successful return from join().

在Stack Overflow中,我发现了以下相关的Q / A:

> Does relaxed memory order effect can be extended to after performing-thread’s life?
> Are memory fences required here?
> Is there an implicit memory barrier with synchronized-with relationship on thread::join?

这让我信服,没关系.

然而,缺点是

>线程的创建和加入会带来额外的努力(并不是那么便宜).

另一种方法可能是使用线程池来克服这个问题.我google了一下,发现例如Jakob Progsch’s ThreadPool on github.但是,我想,在一个线程池中,锁定问题又回到了“游戏中”.

这是否也适用于特征函数,取决于如何.实现了特征函数.如果访问全局变量(在同时调用同一函数时共享),则会导致数据竞争.

谷歌搜索了一下,我发现了以下文档.

Eigen and multi-threading – Using Eigen in a multi-threaded application

In the case your own application is multithreaded, and multiple threads make calls to 070010, then you have to initialize 070010 by calling the following routine before creating the threads:

06003

Note

With 070010 3.3, and a fully C++11 compliant compiler (i.e., 070013), then calling initParallel() is optional.

Warning

note that all functions generating random matrices are not re-entrant nor thread-safe. Those include 070014, and 070015 despite a call to Eigen::initParallel(). This is because these functions are based on std::rand which is not re-entrant. For thread-safe random generator, we recommend the use of boost::random or c++11 random feature.

标签:c,c11,multithreading,eigen,parallel-processing
来源: https://codeday.me/bug/20191006/1858901.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有