ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

C#并行编程:Parallel类

2022-09-05 17:32:16  阅读:225  来源: 互联网

标签:Invoke C# 编程 并行 Parallel ForEach new public


PFX在Parallel类中提供了三个静态方法作为结构化并行的基本形式:

  • Parallel.Invoke方法:并行执行一组委托。
  • Parallel.For方法:执行与C# for循环等价的并行方法。
  • Parallel.ForEach方法:执行与C#foreach循环等价的并行方法。

这三个方法都会阻塞线程直到所有工作完成为止。和PLINQ一样,在出现未处理异常之后,其他的工作线程将会在其当前迭代完成之后停止,并将异常包装为AggregateException抛出给调用者。

Parallel.Invoke方法

Parallel.Invoke方法并行执行一组Action委托,然后等待它们完成。该方法最简单的定义方式如下:

public static void Invoke (params Action[] actions);

和PLINQ一样,Parallel.*方法是针对计算密集型任务而不是I/O密集型任务进行优化的。但是,我们可以使用一次下载两个网页的方式来简单展示Parallel.Invoke的用法:

Parallel.Invoke (
    () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
    () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));

从表面看来,Parallel.Invoke就像是创建了两个绑定到线程的Task对象,然后等待它们执行结束的快捷操作。但是它们存在一个重要区别:如果将一百万个委托传递给Parallel.Invoke方法,它仍然能够有效工作。这是因为该方法会将大量的元素划分为若干批次,并将其分派给底层的Task,而不会单纯为每一个委托创建一个独立的Task。
所有的Parallel方法都需要自行整理结果,这意味着要时刻注意线程安全性。例如,以下代码就不是线程安全的:

var data = new List<string>();
Parallel.Invoke (
    () => data.Add(new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html")),
    () => data.Add(new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")));

Parallel.For方法和Parallel.ForEach方法

Parallel.For和Parallel.ForEach分别等价于C#中的for和foreach循环,但是每一次迭代都是并行而非顺序执行的。以下给出了这两个方法最简单的声明:

public static ParallelLoopResult For(
    int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);
public static ParallelLoopResult ForEach<TSource>(
    OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body);

对于for循环:

// 顺序
for(int i=0; i<100; i++)
    Foo(i);

// 并行话代码
Parallel.For(0, 100, i=> Foo(i));
// 简洁写法
Parallel.For(0, 100,Foo);

而对于foreach循环:

Parallel.ForEach("hello,world",Foo);

外层循环与内层循环

Parallel.For和Parallel.ForEach通常在外层循环比内层循环效果更好,因为前者通常为并行化提供了更大的工作块,而这可以降低管理开销。通常,没必要同时将内层和外层循环并行化。

包含索引的Parallel.ForEach方法

有些情况下,循环迭代中的索引用处很大。对于顺序执行的foreach循环,我们很容易获得索引:

int i = 0;
foreach(char c in "hello,world")
    Console.WriteLine(c.ToString() + i++);

但是在并行上下文中,递增一个共享变量的值不是线程安全的。因此必须使用以下版本的ForEach语句:

Parallel.ForEach ("Hello, world", (c, state, i) =>
{
    Console.WriteLine (c.ToString() + i);
});

为了在实际环境中使用它,我们仍以前面使用PLINQ编写的拼写检查器为例。以下代码将加载一个字典和一个包含一百万个测试单词的数组:

string wordLookupFile = Path.Combine(Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists(wordLookupFile))
    new WebClient().DownloadFile(
        "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);

var wordLookup = new HashSet<string>(
    File.ReadAllLines(wordLookupFile),
    StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range(0, 1000000)
    .Select(i => wordList[random.Next(0, wordList.Length)])
    .ToArray();

wordsToTest[12345] = "woozsh";     
wordsToTest[23456] = "wubsie";     

var misspellings = new ConcurrentBag<Tuple<int, string>>();

Parallel.ForEach(wordsToTest, (word, state, i) =>
{
    if (!wordLookup.Contains(word))
        misspellings.Add(Tuple.Create((int)i, word));
});

需要注意的是,必须将结果整理到一个线程安全的集合中,这是一个缺点(与PLINQ相比)。而这种方式胜过PLINQ的地方是索引化ForEach比索引化Select查询运算符的执行效率高。

ParallelLoopState:提前跳出循环

由于并行的For或ForEach的循环体是一个委托,因此无法使用break语句提前结束循环。但是可以调用ParallelLoopState对象的Break方法或Stop方法来跳出或者结束循环:

public class ParallelLoopState
{
    public void Break();
    public void Stop();

    public bool IsExceptional { get; }
    public bool IsStopped { get; }
    public long? LowestBreakIteration { get; }
    public bool ShouldExitCurrentIteration { get; }
}

获得ParallelLoopState很容易:所有的For和ForEach都有以Action<TSource, ParallelLoopState>为循环体的重载。因此,如果需要并行化如下代码:

foreach (char c in "hello,world")
{
    if (c == ',')
        break;
    else
        Console.Write(c);
}

// 并行化
Parallel.ForEach("hello,world", (c, loopState) =>
{
    if (c == ',')
        loopState.Break();
    else
        Console.Write(c);
});

使用本地值进行优化

Parallel.For和Parallel.ForEach方法均提供了一组接受TLocal泛型类型参数的重载。这些重载方法可帮助优化密集迭代循环过程中的结果整理工作。其中最简单的形式如下:

public static ParallelLoopResult For<TLocal>(
    int fromInclusive,
    int toExclusive,
    Func <TLocal> localInit,
    Func <int, ParallelLoopState, TLocal, TLocal> body,
    Action <TLocal> localFinally);

这些重载都非常复杂,在实际中也很少用到,幸运的是其大部分应用场景都可以通过PLINQ解决。

大部分问题和以下例子类似:假设要对1~10 000 000之间的数字的平方根求和。计算一千万个平方根这种工作很容易并行化,但是对它们的值求和却有点麻烦。因为我们需要锁定并更新最终结果:

object locker = new object();
double total = 0;
Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); });

并行化的效果大部分都被一千万个锁操作和因此带来的阻塞抵消了。
实际上,并不需要一千万次的锁操作。
考虑这样一种情况,假设有一组志愿者需要收集大量的垃圾。如果所有志愿者都共用一个垃圾桶,那么移动和竞争将显著降低收集效率。显然,更加有效的方式是让每一个志愿者都有仅属于自己的(本地)垃圾桶,只是偶尔将自己的垃圾倒入主垃圾桶。
拥有TLocal的For和ForEach重载就是按照上述方式工作的。志愿者就是内部工作线程,本地值就是本地垃圾桶。为了保证Parallel类型的工作,还需要提供两个额外的委托,分别负责:

  • 初始化一个新的本地值。
  • 将本地聚合值和主要结果值进行合并。

此外,循环体委托的返回值也不是void,而是本地值的聚合结果。以下是重构之后的代码:

object locker = new object();
double grandTotal = 0;
Parallel.For(1, 10000000, 
    () => 0.0,                      //初始化本地值
    (i, state, localTotal) =>       // 主体委托
        localTotal + Math.Sqrt(i),  //返回新的本地总数
    localTotal =>
    { 
        lock (locker) 
            grandTotal += localTotal; // 将本地数添加到主总数中
    }
);

虽然我们仍需要锁定,但是这个锁定过程只会在本地结果和最终结果合并时发生,因此整个过程会更加高效。

如前所述,这种场景使用PLINQ往往会更加有效。上述例子可以直接用PLINQ进行并行化:

var total = ParallelEnumerable.Range(1, 10000000)
    .Sum(i => Math.Sqrt(i));

标签:Invoke,C#,编程,并行,Parallel,ForEach,new,public
来源: https://www.cnblogs.com/nullcodeworld/p/16658921.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有