ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python 生产者消费者demo

2020-11-12 15:31:39  阅读:245  来源: 互联网

标签:name deal python demo 生产者 time print shared minute


# -*- coding:utf-8 -*-
from multiprocessing import JoinableQueue, Process, Value

import time

DOWNLOAD_EXCEPTION_MINUTE = 3
DEAL_EXCEPTION_MINUTE = 2


class SharedObj(object):
    def __init__(self, id):
        self.id = id
        pass


from collections import namedtuple

SHARED_OBJ = namedtuple("shared_obj", ["id"])


def download_process(produce_queue, minute_list, sharded_time_cost, shared_is_exception):
    def process(minute_name):
        print("Begin download process minute:%s." % minute_name)
        # if minute_name == DOWNLOAD_EXCEPTION_MINUTE:
        #     raise Exception("Exception in minute_name:%s when download process." % minute_name)

        time.sleep(10)
        print("Success download progress minute:%s." % minute_name)

    try:
        start_time = time.time()
        for minute_name in minute_list:
            if shared_is_exception.value:
                print("Some other error happened when begin download minute:%s." % minute_name)
                break
            process(minute_name)
            produce_queue.put(minute_name)
        end_time = time.time()
        with sharded_time_cost.get_lock():
            sharded_time_cost.value = int(end_time - start_time)
    except Exception as e:
        print(e)
        if shared_is_exception.value:
            print("Some other error happened.")

        with shared_is_exception.get_lock():
            shared_is_exception.value = 1
    #
    produce_queue.put(None)
    produce_queue.join()


def deal_process(produce_queue, shared_deal_time_cost, shared_done_minute_name, shared_done_count, shared_is_exception,
                 s):
    def process():
        print("Begin deal minute:%s." % minute_name)
        # if minute_name == DEAL_EXCEPTION_MINUTE:
        #     raise Exception("Exception happened in minute:%s, when deal." % minute_name)
        print(s)
        time.sleep(5)
        print("Done for deal minute:%s." % minute_name)

    while True:
        minute_name = produce_queue.get()
        #
        print("-----deal process----")
        print("shared_deal_time_cost:%s." % shared_deal_time_cost.value)
        print("shared_done_minute_name:%s." % shared_done_minute_name.value)
        print("shared_done_count:%s." % shared_done_count.value)
        print("shared_is_exception:%s." % shared_is_exception.value)
        print("---------------------")
        #
        if minute_name is None:
            print("Get empty minute name, done for deal.")
            produce_queue.task_done()
            break

        try:
            begin_time = time.time()
            if not shared_is_exception.value:
                process()
            else:
                print("shared_is_exception is true, exception is happened; continue...")
            end_time = time.time()
            #
            with shared_deal_time_cost.get_lock():
                shared_deal_time_cost.value += int(end_time - begin_time)
            with shared_done_minute_name.get_lock():
                shared_done_minute_name.value = minute_name
            with shared_done_count.get_lock():
                shared_done_count.value += 1
        except Exception as e:
            print (e)
            if not shared_is_exception.value:
                with shared_is_exception.get_lock():
                    shared_is_exception.value = 1
        finally:
            #
            produce_queue.task_done()


class TestObj(object):
    def __init__(self):
        self.shared_done_minute = Value("i", 0)
        self.shared_done_count = Value("i", 0)
        self.shared_download_time_cost = Value("i", 0)
        self.shared_deal_time_cost = Value("i", 0)
        self.shared_is_exception = Value("i", 0)

    def main_process(self, ):
        minute_list = [1, 2, 3, 4, 5]
        #
        produce_queue = JoinableQueue()
        download_processor = Process(target=download_process,
                                     args=(produce_queue, minute_list, self.shared_download_time_cost,
                                           self.shared_is_exception,))
        #
        s = SharedObj(id=1)
        print(s)
        deal_processor = Process(target=deal_process, args=(
            produce_queue, self.shared_deal_time_cost, self.shared_done_minute, self.shared_done_count,
            self.shared_is_exception, s,))
        start_time = time.time()
        #
        download_processor.daemon = True
        deal_processor.daemon = True
        #
        download_processor.start()
        deal_processor.start()
        #
        download_processor.join()
        deal_processor.join()
        #
        end_time = time.time()
        print("Main time cost:%s." % (end_time - start_time))


def main():
    test_obj = TestObj()
    test_obj.main_process()


if __name__ == '__main__':
    main()

  

标签:name,deal,python,demo,生产者,time,print,shared,minute
来源: https://www.cnblogs.com/dasheng-maritime/p/13964195.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有