ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

C# 流水线 生产者/消费者链 Producer/Consumer

2022-09-10 14:02:15  阅读:250  来源: 互联网

标签:Producer C# 流程 System 流水线 缓冲区 using Consumer pipleline


<body>
  1. manager.cs

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using bntu.pcm.plworker;
    using bntu.pcm.works;
    
    /*
     * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm  是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     */
    namespace bntu.pcm
    {
        /// <summary>
        /// 管理流水线工程(定义流水线,添加流程)
        /// </summary>
        class Manager
        {
            public static void Main(string[] args)
            {
                // 定义流水线工程,并为其添加流程
                Pipleline pipleline = new Pipleline(new BlockingCollection<string>());
                pipleline.AddItem<string, int>(ReadGrayImage.Read);
                pipleline.AddItem<int, double>(GenerateDeepthImage.Generate);
                pipleline.AddItem<double, double>(CalculateDeepthImage.Calculate);
    
                // 将整个流水线作为后台线程(这是因为前台线程是整个流水线的输入)
                Thread thread = new Thread(() => pipleline.PiplelineWork());
                thread.IsBackground = true;
                thread.Start();
    
                // 整个流水线的输入作为前台线程(这个while循环模拟相机不断输出图像)
                string image_path = GetImagePath();
                while (image_path != null)
                {
                    // 为整个流水线的输入缓冲区添加元素
                    pipleline.HIB.Add(image_path);
                    image_path = GetImagePath();
                }
            }
    
            // 模拟图片的编号
            private static int i = 0;
    
            /// <summary>
            /// 用于模拟相机输出图像
            /// </summary>
            /// <returns>模拟图像</returns>
            public static string GetImagePath()
            {
                return i++.ToString();
            }
        }
    }
    
  2. PiplelineItem.cs

    using System;
    using System.Collections.Concurrent;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * plworker 是pipleline worker的缩写,是“生/消”模型的特殊形式——即流水线模型
     */
    namespace bntu.pcm.plworker
    {
        /// <summary>
        /// 流水线上的流程定义
        /// </summary>
        /// <typeparam name="INPUT">该流程的输入缓冲区的类型</typeparam>
        /// <typeparam name="OUTPUT">该流程的输出缓冲区的类型</typeparam>
        public class PiplelineItem<INPUT, OUTPUT>
        {
            // 该流程的输出缓冲区
            public BlockingCollection<OUTPUT> output;
    
            public BlockingCollection<OUTPUT> Output { get => output; }
    
            /// <summary>
            /// 流程的构造函数
            /// </summary>
            public PiplelineItem()
            {
                // 流程的输出缓冲区由其自己定义,然后为后一流程提供一个作为输入缓冲区的接口
                this.output = new BlockingCollection<OUTPUT>();
            }
    
            /// <summary>
            /// 该流程的操作过程(从输入缓冲区取走,然后处理后放入输出缓冲区)
            /// </summary>
            /// <param name="input_buffer">该流程的输入缓冲区(上一流程的输出缓冲区)</param>
            /// <param name="handle">该流程中的具体操作</param>
            private void Action(BlockingCollection<INPUT> input_buffer, Func<INPUT, OUTPUT> handle)
            {
                try
                {
                    // 从输入缓冲区中取走元素
                    foreach (var item in input_buffer.GetConsumingEnumerable())
                    {
                        // 经过指定操作后放入输出缓冲区
                        this.output.Add(handle(item));
                    }
                }
                finally
                {
                    // 当输入缓冲区取空了就需要告知输出缓冲区没有新的元素进入,否则线程无法结束
                    this.output.CompleteAdding();
                }
            }
    
            /// <summary>
            /// 获取该流程的动作(也就是操作或者称为任务)
            /// </summary>
            /// <param name="input_buffer">该流程的输入缓冲区(上一流程的输出缓冲区)</param>
            /// <param name="handle">该流程中的具体操作</param>
            /// <returns>该流程的任务</returns>
            public Action GetPiplelineItemAction(object input_buffer, object handle)
            {
                // 在该pipleline_item中包含了input的信息,所以在PiplelineItem可以进行类型转换,而在Pipleline中就不行
                BlockingCollection<INPUT> _input_buffer = input_buffer as BlockingCollection<INPUT>;
                Func<INPUT, OUTPUT> _handle = handle as Func<INPUT, OUTPUT>;
                // this指针包含了模板信息,所以action就固定成Task了,而不再是Pipleline中的Task<dynamic>
                return () => this.Action(_input_buffer, _handle);
            }
        }
    }
    
  3. Pipleline.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * plworker 是pipleline worker的缩写,是“生/消”模型的特殊形式——即流水线模型
     */
    namespace bntu.pcm.plworker
    {
        /// <summary>
        /// 流水线的结构定义
        /// </summary>
        public class Pipleline
        {
            // 整个流水线的输入缓冲区
            private dynamic head_input_buffer;
            // 流水线上的所有流程组成的列表,各个流程中执行的操作组成的刘表
            private List<object> pipleline_item_list, handle_list;
    
            public dynamic HIB { get => head_input_buffer; }
    
            /// <summary>
            /// 流水线构造函数
            /// </summary>
            /// <param name="head_input_buffer">第一个输入缓冲区,也是整个流水线的输入缓冲区</param>
            public Pipleline(dynamic head_input_buffer)
            {
                this.head_input_buffer = head_input_buffer;
                this.pipleline_item_list = new List<object>();
                this.handle_list = new List<object>();
            }
    
            /// <summary>
            /// 为流水线添加一个流程
            /// </summary>
            /// <typeparam name="INPUT">输入缓冲区的类型</typeparam>
            /// <typeparam name="OUTPUT">输出缓冲区的类型</typeparam>
            /// <param name="handle">该流程需要执行的操作</param>
            public void AddItem<INPUT, OUTPUT>(Func<INPUT, OUTPUT> handle)
            {
                this.pipleline_item_list.Add(new PiplelineItem<INPUT, OUTPUT>());
                this.handle_list.Add(handle);
            }
    
            /// <summary>
            /// 构建流水线工程(也就是把流水线的各个结点都联系起来)
            /// </summary>
            public void PiplelineWork()
            {
                // 缓冲区列表,任务列表,任务工厂
                var buffer_list = new List<object>();
                var task_list = new List<Task>();
                var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
    
                // 先将整个流水线的输入缓冲区加入流水线再说
                buffer_list.Add(this.head_input_buffer);
                // 浅浅用一波zip函数(没啥实质性用处,主要是方便)
                foreach ((dynamic pipleline_item, dynamic handle) in
                    this.pipleline_item_list.Zip(this.handle_list,
                    (pipleline_item, handle) => new KeyValuePair<dynamic, dynamic>(pipleline_item, handle)))
                {
                    // 获取该流程的输入缓冲区(其实就是上一个流程的输出缓冲区)
                    var input_buffer = buffer_list.Last();
                    // 将流程需要执行的操作作为一个新的任务,并将该线程(任务)加入任务列表
                    task_list.Add(taskFactory.StartNew(pipleline_item.GetPiplelineItemAction(input_buffer, handle)));
                    // 将该流程的输出缓冲区加入缓冲区列表,作为后一个流程的输入缓冲区
                    buffer_list.Add(pipleline_item.Output);
                }
    
                // 等待所有所需的结果(就是已经结束咧)
                Task.WaitAll(task_list.ToArray());
            }
        }
    }
    
  4. CalculateDeepthImage.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * works    是流水线的各个流程
     */
    namespace bntu.pcm.works
    {
        /// <summary>
        /// 模拟进行深度图的计算
        /// </summary>
        class CalculateDeepthImage
        {
            public static double Calculate(double d)
            {
                Console.WriteLine("计算深度图像:" + d);
                Task.Delay(new Random().Next(300));
                return d;
            }
        }
    }
    
  5. GenerateDeepthImage.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * works    是流水线的各个流程
     */
    namespace bntu.pcm.works
    {
        /// <summary>
        /// 模拟通过灰度图像产生深度图
        /// </summary>
        class GenerateDeepthImage
        {
            public static double Generate(int i)
            {
                Console.WriteLine("产生深度图像:" + i * 1.0);
                Task.Delay(new Random().Next(200));
                return i * 1.0;
            }
        }
    }
    
  6. ReadGrayImage.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * works    是流水线的各个流程
     */
    namespace bntu.pcm.works
    {
        /// <summary>
        /// 模拟读取灰度图像
        /// </summary>
        class ReadGrayImage
        {
            public static int Read(string s)
            {
                Console.WriteLine("读取灰度图像:" + s);
                Task.Delay(new Random().Next(100));
                return Convert.ToInt32(s);
            }
        }
    }
    
</body>

标签:Producer,C#,流程,System,流水线,缓冲区,using,Consumer,pipleline
来源: https://www.cnblogs.com/SimbaWang/p/16676365.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有