ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

python学习笔记之---多进程实例

2019-11-12 10:02:07  阅读:168  来源: 互联网

标签:__ Process python self 笔记 print 实例 进程 multiprocessing


 python进程:

一些进程中的模块: os.fork() subprocess processing Multiprocessing   进程间通信方式:
  • 文件
  • 管道
  • socket
  • 信号
  • 信号量
  • 共享内存

 

 

 


①Linux 下通过fork生成进程

fork()函数,它也属于一个内建函数,并且只在Linux系统下存在。它非常特殊。
普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回子进程的PID。
这样做的理由是,一个父进程可以fork()出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID,子进程只需要调用os.getpid()函数可以获取自己的进程号。

"""
pid=os.fork()
    1.只用在Unix系统中有效,Windows系统中无效
    2.fork函数调用一次,返回两次:
    在父进程中返回值为子进程id,在子进程中返回值为0
"""
 
import os
 
pid=os.fork()  #生成了一个子进程,出现了2个进程同时开始向下执行:
 
if pid==0:
    print("执行子进程,子进程pid={pid},父进程ppid={ppid}".\
format(pid=os.getpid(),ppid=os.getppid()))
else:
    print("执行父进程,子进程pid={pid},父进程ppid={ppid}".\
format(pid=pid,ppid=os.getpid()))

解析:

os.fork() 执行的时候会生成子进程,主程序还有一个主进程,

此时出了2个进程,一个主进程,一个是子进程

os.fork()规定,主进程自动获取函数返回值是子进程的pid子进程获取函数返回值是0

 

主进程:pid=19293

if pid==0:

    print("执行子进程,子进程pid={pid},父进程ppid={ppid}".\

format(pid=os.getpid(),ppid=os.getppid()))

else:

    print("执行父进程,子进程pid={pid},父进程ppid={ppid}".\

format(pid=pid,ppid=os.getpid()))

 

主进程执行的结果:else的分支

 

子进程:pid = 0

if pid==0:

    print("执行子进程,子进程pid={pid},父进程ppid={ppid}".\

format(pid=os.getpid(),ppid=os.getppid()))

else:

    print("执行父进程,子进程pid={pid},父进程ppid={ppid}".\

format(pid=pid,ppid=os.getpid()))

 

子进程执行的结果:if的分支

综合起来看,因为2个进程分别执行if和else的代码块,所以最后的结果是,if被子进程执行了一次,else被主进程执行了一次

 

 

②#创建进程:(跨平台)----multiprocessing
import multiprocessing
def do(n) :  #任务函数
  #获取当前线程的名字
    name = multiprocessing.current_process().name
    print(name,'starting')
    print("worker ", n)
    return
 
 
if __name__ == '__main__':
    numList = []
   #蓝色部分整个实现了5个进程串行
    for i in range(5) :
        p = multiprocessing.Process(target=do, args=(i,))  #multiprocessing.Process生成进程,循环5次生成5个进程,args传递一个元祖
        numList.append(p)  #把进程对象p添加到列表里
        p.start()     #start启动进程,start之后是就绪态(创建好进程之后要等待cpu有空才能执行,所以start之后是就绪态)
        p.join()    #join表示当前的进程对象执行完毕之后主进程才会往下执行,子进程之间是串行的
        print("Process end.")
 
 
 
'''  
#要想实现5个子进程并行,修改方案:
    for i in numList:
        i.join()  #进程1执行完,才会执行下一次循环
                #进程2执行完,才会执行下一次循环
                #......
                #....
                #进程5执行完,退出循环
'''
   print(numList)
 
#-------》整个程序是一个子进程和一个主进程在执行
执行结果: E:\>py -3 a.py Process-1 starting worker  0 Process end. Process-2 starting worker  1 Process end. Process-3 starting worker  2 Process end. Process-4 starting worker  3 Process end. Process-5 starting worker  4 Process end. [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process- 3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]   #并行执行结果:#join等待子进程执行完毕才结束主进程
import multiprocessing
def do(n) :  #任务函数
  #获取当前线程的名字
    name = multiprocessing.current_process().name
    print(name,"starting")
    print("worker ", n)
    return
 
 
if __name__ == '__main__':
    numList = []
    #蓝色部分整个实现了5个进程串行
    for i in range(5) :
        p = multiprocessing.Process(target=do, args=(i,))  #multiprocessing.Process生成进程,循环5次生成5个进程,args传递一个元祖
        numList.append(p)  #把进程对象p添加到列表里
        p.start()     #start启动进程,start之后是就绪态(创建好进程之后要等待cpu有空才能执行,所以start之后是就绪态)
        #p.join()    #join表示当前的进程对象执行完毕之后主进程才会往下执行,子进程之间是串行的
        print("Process end.")
 
 
   #要想实现5个子进程并行,修改方案:
    for i in numList:
        i.join()  #进程1执行完,才会执行下一次循环
                #进程2执行完,才会执行下一次循环
                #......
                #....
                #进程5执行完,退出循环
#join让主进程等待子进程执行完毕,然后主进程才会打印最后一句话
 
    print(numList)
执行结果: E:\>py -3 a.py Process end. Process end. Process end. Process end. Process end. Process-2 starting worker  1 Process-1 starting worker  0 Process-3 starting worker  2 Process-4 starting worker  3 Process-5 starting worker  4 [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process- 3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]  #加了join让子进程结束之后才执行主进程的 print(numList)     #如果不加join的效果,就是主进程退出了子进程还在跑;如果主进程退出了,想让子进程也结束可以作为看护进程。
import multiprocessing
def do(n) :  #任务函数
  #获取当前线程的名字
    name = multiprocessing.current_process().name
    print(name,"starting")
    print("worker ", n)
    return
 
 
if __name__ == '__main__':
    numList = []
    #蓝色部分整个实现了5个进程串行
    for i in range(5) :
        p = multiprocessing.Process(target=do, args=(i,))  #multiprocessing.Process生成
进程,循环5次生成5个进程,args传递一个元祖
        numList.append(p)  #把进程对象p添加到列表里
        #p.start()     #start启动进程,start之后是就绪态(创建好进程之后要等待cpu有空才能
执行,所以start之后是就绪态)
        #p.join()    #join表示当前的进程对象执行完毕之后主进程才会往下执行,子进程之间是
串行的
        print("Process end.")
 
 
    for i in numList:
        i.start()
 
 
    print(numList)
执行结果:   E:\>py -3 a.py Process end. Process end. Process end. Process end. Process end. #这里先打印了主进程numList,主进程退出 [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process- 3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>] #子进程还在跑 Process-2 starting worker  1 Process-3 starting worker  2 Process-1 starting worker  0 Process-4 starting worker  3 Process-5 starting worker  4     #不加join主进程结束之后,子进程还在继续       ③#os.fork()和multiprocessing的结合使用--------precoess是跨平台的
#linux上的执行版本:(linux上执行不用加if __name__ == "__main__":  )
from multiprocessing import Process
import os
import time
 
 
def sleeper(name, seconds):
    print("Process ID# %s" % (os.getpid()))
    print("Parent Process ID# %s" % (os.getppid()))
#仅支持在linux上,一个进程会有父进程和自己的ID,windows上就没有父进程id
    print("%s will sleep for %s seconds" % (name, seconds))
    time.sleep(seconds)
 
 
# if __name__ == "__main__":
child_proc = Process(target = sleeper, args = ('bob', 5))
child_proc.start()
print("in parent process after child process start")
print("parent process about to join child process")
child_proc.join()
print("in parent process after child process join" )
print("the parent's parent process: %s" % (os.getppid()))
  #windows上的执行版本:
from multiprocessing import Process  
import os  
import time  
 
 
def sleeper(name, seconds):  
    print("Process ID# %s" % (os.getpid()))  
    print("Parent Process ID# %s" % (os.getppid()))
#仅支持在linux上,一个进程会有父进程和自己的ID,windows上就没有父进程id
    print("%s will sleep for %s seconds" % (name, seconds))  
    time.sleep(seconds)
 
 
if __name__ == "__main__":  
    child_proc = Process(target = sleeper, args = ('bob', 5))  
    child_proc.start()
    print("in parent process after child process start")  
    print("parent process about to join child process")
    child_proc.join()
    print("in parent process after child process join" )
    print("the parent's parent process: %s" % (os.getppid()))
执行结果: E:\>py -3 a.py in parent process after child process start parent process about to join child process Process ID# 39956 Parent Process ID# 40176 bob will sleep for 5 seconds in parent process after child process join the parent's parent process: 43868       ④#多进程模板程序
#coding=utf-8
import multiprocessing
import urllib.request
import time
 
 
def func1(url) :
    response = urllib.request.urlopen(url)
    html = response.read()
    print(html[0:20])
    time.sleep(1)
 
 
def func2(url) :
    response = urllib.request.urlopen(url)
    html = response.read()
    print(html[0:20])
    time.sleep(1)
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
    p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    time.sleep(1)
    print("done!")
执行结果: E:\>py -3 a.py b'<!DOCTYPE html>\n<!--' b'<!DOCTYPE html><html' done!       ⑤#测试多进程和单进程执行效率比较:----用map
#coding: utf-8
import multiprocessing
import time
def m1(x):
    time.sleep(0.01)
    return x * x
 
 
if __name__ == '__main__':
    #多进程
    #进程池,multiprocessing.cpu_count()获取当前cpu操作的核数
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    i_list = range(1000)
    time1=time.time()
    result = pool.map(m1, i_list)  #用m1这个函数去把进程调用一千次
    time2=time.time()
    print('time elapse1:',time2-time1)
    print(len(result))
 
    #单进程执行
    time1=time.time()
    result= list(map(m1, i_list))
    time2=time.time()
    print('time elapse2:',time2-time1)
    print(len(result))
执行结果: E:\>py -3 a.py time elapse1: 2.8980000019073486 1000 time elapse2: 10.0 1000    
进程池 进程如果很多的话,用process类去生成进程也会很费时,这个时候就要用到进程池。 Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时, 如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知 先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。   pool类中的方法: ✓ apply(): 函数原型:apply(func[, args=()[, kwds={}]]) 该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且 3.x以后不再使用)。  ✓ map() 函数原型:map(func, iterable[, chunksize=None]) Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到 返回结果。 注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后, 程序才会运行子进程。 ✓ close() 关闭进程池(Pool),使其不再接受新的任务。  ✓ terminate() 立刻结束工作进程,不再处理未处理的任务。     #创建简单的进程池---用async(异步)
#encoding=utf-8
from multiprocessing import Pool
 
 
def f(x):
    return x * x
 
 
if __name__ == '__main__':
    pool = Pool(processes = 4)      # start 4 worker processes
    result = pool.apply_async(f, [10])  # evaluate "f(10)" asynchronously
    print(result.get(timeout = 1))
    print(pool.map(f, range(10)))   # prints "[0, 1, 4,..., 81]",map进程池执行单一的进程
执行结果: E:\>py -3 a.py 100 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]   #上面的例子,想给map传递多个参数:
#encoding=utf-8
from multiprocessing import Pool
 
def f(object):
    return object[0] * object[1]
 
class A:
    def __init__(self,a,b):
        self.x =a
        self.y =b
 
if __name__ == '__main__':
    pool = Pool(processes = 4)      # start 4 worker processes
    params = [A(i,i) for i in range(10)]
    print(pool.map(f,[(0,0),(1,1),(2,2),(3,3)]))   # prints "[0, 1, 4,..., 81]“
#实现给map中的函数传递多个参数。
执行结果: E:\>py -3 a.py [0, 1, 4, 9]       同步进程的几种方式:
  • 消息队列:使用Queue&JoinableQueue
  • 加锁-Lock,Semaphore
  • 信号传递-Event
  • 使用管道-Pipe
  • 使用Condition
      同步进程:(用队列) #主进程放数据,子进程取数据,实现两个进程之间的互相配合
#encoding=utf-8
from multiprocessing import Process, Queue   #引入多进程的包,可以跨进程的包
import time
 
 
def offer(queue):  
  # 入队列
  time.sleep(5)
  queue.put("Hello World")  
 
 
if __name__ == '__main__':  
  # 创建一个队列实例
  q = Queue()
  p = Process(target = offer, args = (q,))  
  p.start()  
  print(q.get()) # 出队列
  p.join()
  print("Done")
D:\>py -3 a.py Hello World Done       JoinableQueue的例子   Consumer 进程对象: Consumer().start()就可以当做启动了一个新的子进程 run方法:是进程对象执行的主体,里面写了一个死循环 从taskqueue里面取出每一个任务 只要取出的任务不是None,就一直做任务,是None就退出,结束进程 是任务就执行任务,nexttask()这样的方式会调用任务类中的__call__方法来执行任务 执行的结果会放到resultqueue里面。   Task :任务对象类,就是打印一个表示式 ?*?=两个数的乘积   taskqueue 是joinable队列,每次取出一个元素后,必须调用 taskdone来表示这个任务完成了。否则,就会死等。(挂起。。。)   resultqueue:结果队列,放置进程执行task任务后的执行结果。
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):  #这个类生成了进程对象,继承multiprocessing.Process父类
    # 派生进程
    def __init__(self, task_queue, result_queue):  #父类的构造函数
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue   #任务队列,干活
        self.result_queue = result_queue  #结果队列
 
 
    # 重写原进程的run方法
    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:  #最后一个任务会被指定为None,然后退出
                # Poison pill means shutdown
                print(('%s: Exiting' % proc_name))
                self.task_queue.task_done()   #完成一个任务必须调用task_done
                break
            print(('%s: %s' % (proc_name, next_task)))
            answer = next_task() # __call__()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return
#Consumer().start()可以自动调用run方法的内容。
 
class Task(object):   #任务类中通过call定义了任务:打印两个数的乘法的表达式
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)
 
 
if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    # Start consumers
    num_consumers = multiprocessing.cpu_count()
    print(('Creating %d consumers' % num_consumers))
    # 创建cup核数量数量个的子进程
    consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
    # 依次启动子进程
    for w in consumers:
        w.start()
 
 
# Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))
 
 
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)
    # Wait for all of the tasks to finish
    tasks.join()
 
 
    # Start printing results
    while num_jobs:
        result = results.get()
        print ('Result: %s' %result)
        num_jobs -= 1
D:\>py -3 a.py Creating 4 consumers Consumer-1: 0 * 0 Consumer-2: 1 * 1 Consumer-3: 2 * 2 Consumer-4: 3 * 3 Consumer-1: 4 * 4 Consumer-2: 5 * 5 Consumer-3: 6 * 6 Consumer-4: 7 * 7 Consumer-1: 8 * 8 Consumer-2: 9 * 9 Consumer-3: Exiting Consumer-4: Exiting Consumer-1: Exiting Consumer-2: Exiting Result: 0 * 0 = 0 Result: 1 * 1 = 1 Result: 2 * 2 = 4 Result: 3 * 3 = 9 Result: 4 * 4 = 16 Result: 5 * 5 = 25 Result: 6 * 6 = 36 Result: 7 * 7 = 49 Result: 8 * 8 = 64 Result: 9 * 9 = 81   >>> class P: ...    def __call__(self): ...        print("hello") ...    def __str__(self): ...        return "hi" ...    def __repr__(self): ...        return "ooo!" ... >>> >>> p=P() >>> p()  #调用__call__方法 hello >>> print(p)  #调用__str__方法 hi >>> repr(p)  #调用__repr__方法 'ooo!' >>> p  #调用__repr__方法 ooo! >>>       拆解分析:-----》》》》 1.启动了进程,执行了run函数。所以Consumer类就是一个进程对象,实例化后当做进程去用
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):
    # 派生进程
    def __init__(self):
        multiprocessing.Process.__init__(self)#父类的构造函数
 
 
    # 重写原进程的run方法
    def run(self):
        print(1)
        return
 
 
if __name__ == "__main__":
    Consumer().start()  #可以自动调用run方法的内容。
执行结果: D:\>py -3 a.py 1     2.task类就是传递两个参数,打印了一个表达式(a*b=的乘积)
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):#生成了进程对象
    # 派生进程
    def __init__(self):
        multiprocessing.Process.__init__(self)#父类的构造函数
 
 
    # 重写原进程的run方法
    def run(self):
        print(1)
        return
 
 
class Task(object): #任务对象
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)
 
 
 
if __name__ == "__main__":
    #Consumer().start()  #可以自动调用run方法的内容。
    nexttask = Task(3,4)
    print(nexttask())
执行结果: D:\>py -3 a.py 3 * 4 = 12       3.添加队列,这个时候是单进程执行
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):#生成了进程对象
    # 派生进程
    def __init__(self,task_queue):
        multiprocessing.Process.__init__(self)#父类的构造函数
        self.task_queue = task_queue
 
 
    # 重写原进程的run方法
    def run(self):
        while 1:
            nexttask = self.task_queue.get()
            if nexttask is None:
                self.task_queue.task_done()
                return
            print(nexttask())
            self.task_queue.task_done()
        return
 
 
class Task(object): #任务对象
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)
 
 
 
 
if __name__ == "__main__":
    tasks = multiprocessing.JoinableQueue()
    for i in range(5):
        tasks.put(Task(i,i+1))
    tasks.put(None)
    Consumer(tasks).start()  #可以自动调用run方法的内容。
    tasks.join()
执行结果: D:\>py -3 a.py 0 * 1 = 0 1 * 2 = 2 2 * 3 = 6 3 * 4 = 12 4 * 5 = 20   任务对象怎么执行的: 1 Task(i,i+1),生成任务实例 2 tasks.put(Task(i,i+1)),把任务实例放到任务队列里面 3 进程对象从任务队列里面取出任务对象:   nexttask = self.task_queue.get() 4 执行任务:   nexttask()#--->调用Task类里面的__call__方法 5 任务执行完毕:    self.task_queue.task_done() 6 比如多个任务中,有任意一个任务 漏掉执行self.task_queue.task_done() 7 tasks.join()导致主程序的卡死,因为它会死等 被漏掉的任务执行self.task_queue.task_done()。   4.实现多进程(multiprocessing跨进程的,最后取结果的时候是同一份)
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):#生成了进程对象
    # 派生进程
    def __init__(self,task_queue,result_queue):
        multiprocessing.Process.__init__(self)#父类的构造函数
        self.task_queue = task_queue
        self.result_queue = result_queue
    # 重写原进程的run方法
    def run(self):
        while 1:
            nexttask = self.task_queue.get()
            if nexttask is None:
                self.task_queue.task_done()
                return
            self.result_queue.put(nexttask())
            self.task_queue.task_done()
        return
 
 
class Task(object): #任务对象
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)
 
 
 
 
if __name__ == "__main__":
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    for i in range(10):
        tasks.put(Task(i,i+1))
    #获得当前电脑的cpu核数,4核
    num_consumers = multiprocessing.cpu_count()
    consumers = [ Consumer(tasks,results) for i in range(num_consumers) ]  #推导列表生成4个对象,放到consumers里,这个时候就有多个consumers可以做任务了。
   #启动每一个进程
    for i in consumers:
        i.start()
 
    #给每个进程放一个None
    for i in consumers:
        tasks.put(None)
   
    tasks.join()
    #最后打印结果队列中的内容
    num_jobs=10
    while num_jobs:
        result = results.get()  #取结果队列中的内容
        print ('Result: %s' %result)
        num_jobs -= 1
执行结果: D:\>py -3 a.py Result: 0 * 1 = 0 Result: 1 * 2 = 2 Result: 2 * 3 = 6 Result: 3 * 4 = 12 Result: 4 * 5 = 20 Result: 5 * 6 = 30 Result: 6 * 7 = 42 Result: 7 * 8 = 56 Result: 8 * 9 = 72 Result: 9 * 10 = 90       线程安全(多线程、多进程)----加锁,保证安全,线程不安全(单线程、单进程、协程)-----不加锁,,提升性能 锁: 进程1:加锁:a=1,我取走了。。。。计算完毕a=2,释放锁。。。。         进程2:有锁,死等,别人释放锁,加锁:a=2,+1,a=3,释放锁。。。     #同步进程--加锁 把并发的东西变成了一个线性的执行
from multiprocessing import Process, Lock  
 
 
def l(lock, num):  
  lock.acquire() # 获得锁
  print("Hello Num: %s" % (num))  
  lock.release() # 释放锁
 
 
if __name__ == '__main__':  
    lock = Lock()  # 创建一个共享锁实例
    for num in range(20):  
      Process(target = l, args = (lock, num)).start()
 
#不加锁---并发执行
from multiprocessing import Process, Lock  
import time
def l(lock, num):  
  #lock.acquire() # 获得锁
  time.sleep(1)
  print("Hello Num: %s" % (num))  
  #lock.release() # 释放锁
 
 
if __name__ == '__main__':  
    lock = Lock()  # 创建一个共享锁实例
    for num in range(20):  
      Process(target = l, args = (lock, num)).start()

 

    #加多把锁
#encoding=utf-8
import multiprocessing
import time
def worker(s, i):
  s.acquire()
  print(multiprocessing.current_process().name + " acquire")
  time.sleep(i)
  print(multiprocessing.current_process().name + " release")
  s.release()
 
 
if __name__ == "__main__":
  # 设置限制最多3个进程同时访问共享资源
  s = multiprocessing.Semaphore(3)
  for i in range(5):
    p = multiprocessing.Process(target = worker, args = (s, i * 2))
    p.start()
D:\>py -3 a.py Process-1 acquire Process-1 release Process-2 acquire Process-4 acquire Process-5 acquire Process-2 release Process-3 acquire Process-4 release Process-3 release Process-5 release     信号传递
#encoding=utf-8
import multiprocessing
import time
 
 
def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait() # 等待收到能执行信号,如果一直未收到将一直阻塞
    print('wait_for_event: e.is_set()->', e.is_set())
 
 
def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)# 等待t秒超时,此时Event的状态仍未未设置,继续执行
    print('wait_for_event_timeout: e.is_set()->', e.is_set())
    e.set()# 初始内部标志为真
 
 
if __name__ == '__main__':
    e = multiprocessing.Event()
    print("begin,e.is_set()", e.is_set())
    w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
    w1.start()
    
    #可将2改为5,看看执行结果
    w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))
    w2.start()
 
 
    print('main: waiting before calling Event.set()')
    time.sleep(3)
    # e.set()   #可注释此句话看效果
    print('main: event is set')
执行结果: D:\>py -3 a.py begin,e.is_set() False main: waiting before calling Event.set() wait_for_event: starting wait_for_event_timeout: starting wait_for_event_timeout: e.is_set()-> False wait_for_event: e.is_set()-> True main: event is set

 

 

进程同步(使用管道-Pipe) #使用管道模式-进程间的通信方式是你一句我一句
#encoding=utf-8
import multiprocessing as mp
 
def proc_1(pipe):
    pipe.send('hello')
    print('proc_1 received: %s' %pipe.recv())
    pipe.send("what is your name?")
    print('proc_1 received: %s' %pipe.recv())
 
def proc_2(pipe):
    print('proc_2 received: %s' %pipe.recv())
    pipe.send('hello, too')
    print('proc_2 received: %s' %pipe.recv())
    pipe.send("I don't tell you!")
 
if __name__ == '__main__':
  # 创建一个管道对象pipe
  pipe = mp.Pipe()
  print(len(pipe))
  print(type(pipe))
  # 将第一个pipe对象传给进程1
  p1 = mp.Process(target = proc_1, args = (pipe[0], ))
  # 将第二个pipe对象传给进程2
  p2 = mp.Process(target = proc_2, args = (pipe[1], ))
  p2.start()
  p1.start()
  p2.join()
  p1.join()
执行结果: C:\Users\dell>py -3 C:\Users\dell\Desktop\练习\5\0518.py 2 <class 'tuple'> proc_2 received: hello proc_1 received: hello, too proc_2 received: what is your name? proc_1 received: I don't tell you!     小练习:将收到的消息,按照顺序写入到一个char_history.txt文件里面。 #思路:收到一句写一句
#encoding=utf-8
import multiprocessing as mp
 
def write_message(file_path,message):
    with open(file_path,"a") as fp:
        fp.write(message+"\n")
 
 
def proc_1(pipe):
    pipe.send('hello')
    message =pipe.recv()
    print('proc_1 received: %s' %message)
    write_message("d:\\a.txt",message)
    pipe.send("what is your name?")
    message =pipe.recv()
    print('proc_1 received: %s' %message)
    write_message("d:\\a.txt",message)
 
def proc_2(pipe):
    message =pipe.recv()
    print('proc_2 received: %s' %message)
    write_message("d:\\a.txt",message)
    pipe.send('hello, too')
    message =pipe.recv()
    print('proc_2 received: %s' %message)
    write_message("d:\\a.txt",message)
    pipe.send("I don't tell you!")
 
if __name__ == '__main__':
  # 创建一个管道对象pipe
  pipe = mp.Pipe()
  print(len(pipe))
  print(type(pipe))
  # 将第一个pipe对象传给进程1
  p1 = mp.Process(target = proc_1, args = (pipe[0], ))
  # 将第二个pipe对象传给进程2
  p2 = mp.Process(target = proc_2, args = (pipe[1], ))
  p2.start()
  p1.start()
  p2.join()
  p1.join()

 

      同步进程(使用condition) #生产者和消费者模式
#encoding=utf-8
import multiprocessing as mp
import threading
import time
def consumer(cond):
  with cond:
    print("consumer before wait")
    cond.wait() # 等待消费
    print("consumer after wait")
 
 
def producer(cond):
  with cond:
    print("producer before notifyAll")
    cond.notify_all() # 通知消费者可以消费了
    print("producer after notifyAll")
 
 
if __name__ == '__main__':
    condition = mp.Condition()
 
    #p1,p2是消费者,p3是生产者
    p1 = mp.Process(name = "p1", target = consumer, args=(condition,))
    p2 = mp.Process(name = "p2", target = consumer, args=(condition,))
    p3 = mp.Process(name = "p3", target = producer, args=(condition,))
 
 
    p1.start()
    time.sleep(2)
    p2.start()
    time.sleep(2)
    p3.start()
执行结果: E:\>py -3 a.py consumer before wait consumer before wait producer before notifyAll producer after notifyAll consumer after wait consumer after wait       小练习:将生产者和消费者增加队列,来完成一个任务。 比如:生产者生产一种食物,放到队列里面,消费者去队列里面去取食物去吃。
#encoding=utf-8
import multiprocessing as mp
from multiprocessing import Queue
import threading
import time
import random
def consumer(cond,q):
  for i in range(3):
    with cond:
    
      print("consumer before wait")
      cond.wait() # 等待消费
      food = q.get()
      print("consumer after wait,eat %s" %food)
 
def producer(cond,q):
  
  food_material = ["tomato","egg","lettuce","potato"]
  for i in range(6):
    with cond:
      print("producer before notifyAll")
      cond.notify_all() # 通知消费者可以消费了
      q.put(random.choice(food_material)+" "+random.choice(food_material))
      q.put(random.choice(food_material)+" "+random.choice(food_material))
      print("producer after notifyAll")
 
if __name__ == '__main__':    
  condition = mp.Condition()
  q = Queue()
  p1 = mp.Process(name = "p1", target = consumer, args=(condition,q))
  p2 = mp.Process(name = "p2", target = consumer, args=(condition,q))
  p3 = mp.Process(name = "p3", target = producer, args=(condition,q))
 
  p1.start()
  time.sleep(2)
  p2.start()
  time.sleep(2)
  p3.start()
E:\>py -3 a.py consumer before wait consumer before wait producer before notifyAll producer after notifyAll consumer after wait,eat egg egg consumer after wait,eat egg potato producer before notifyAll producer after notifyAll consumer before wait consumer before wait producer before notifyAll producer after notifyAll consumer after wait,eat egg lettuce consumer after wait,eat potato potato producer before notifyAll producer after notifyAll consumer before wait consumer before wait producer before notifyAll producer after notifyAll consumer after wait,eat tomato tomato consumer after wait,eat potato egg producer before notifyAll producer after notifyAll       多进程间共享全局变量
#encoding=utf-8
from multiprocessing import Process
def f(n, a):
    n = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
        print(a[i])
 
if __name__ == '__main__':
    num = 0 #
    arr = list(range(10))
    p = Process(target = f, args = (num, arr))
    p.start()
    p.join()
    print(num)
    print(arr[:])
 
#encoding=utf-8
from multiprocessing import Process, Value, Array
 
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
 
if __name__ == '__main__':
    num = Value('d', 0.0) # 创建一个进程间共享的数字类型,默认值为0
    arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]
    p = Process(target = f, args = (num, arr))
    p.start()
    p.join()
 
    print(num.value) # 获取共享变量num的值
    print(arr[:])
C:\Users\dell>py -3 C:\Users\dell\Desktop\练习\5\0518.py 0 -1 -2 -3 -4 -5 -6 -7 -8 -9 0 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]         """ 小练习: 多进程访问6个网址,计算一下一共有多少个字符。用2个子进程来实现。 """
#encoding=utf-8
from multiprocessing import Process,Queue,Value, Array
 
import requests
 
 
url= ["http://www.sina.com.cn","http://www.sohu.com","http://www.163.com","http://cn.bing.com","http://www.baidu.com","http://www.iciba.com"]
 
q=Queue() #生成跨进程的队列
for i in url:  #把url放进队列
    q.put(i)
 
def f(count,q):
    while not q.empty(): #如果队列不为空
        url = q.get()
        r=requests.get(url)
        count.value +=len(r.text)
        print(url,len(r.text))
        #print(r.text)
 
 
if __name__ =="__main__":
     num = Value('i', 0) # 创建一个进程间共享的数字类型,默认值为0
     p1 = Process(target = f, args = (num, q))
     p2 = Process(target = f, args = (num, q))
     p1.start()
     p2.start()
     p1.join()
     p2.join()
 
     print(num.value)

 

  #共享变量 加锁
#encoding=utf-8
import time
from multiprocessing import Process, Value, Lock
 
class Counter(object): #操作共享变量的类
    def __init__(self, initval = 0):
        self.val = Value('i', initval)
        #生成实例变量,共享整数变量
        self.lock = Lock()#跨进程的锁
          
    def increment(self):  #通过加锁的方式,给共享变量+1
        with self.lock:
            self.val.value += 1 # 共享变量自加1
            #print(“increment one time!”,self.value() )  
             #加此句死锁
 
    def value(self):#获取当前共享变量的值
        with self.lock:
        #with自动调用实例中的__enter__,__exit__方法
            return self.val.value  
            #加锁的情况下返回当前共享变量的值
 
def func(counter):#多进程执行的任务,counter是类Counter的实例
    for i in range(50): #做50次累加
        time.sleep(0.01)
        counter.increment()
 
if __name__ == '__main__':
    counter = Counter(0)  #生成了一个类Counter的实例
    procs = [Process(target = func,
args = (counter,)) for i in range(10)]
    #启动了10个进程,执行任务函数func,传入的参数是
    #所有的计数操作都会在counter实例里面完成。
    # 等价于
    # for i in range(10):
      # Process(target = func, args = (counter,))
    for p in procs: p.start()  #启动了10个进程
    for p in procs: p.join()   #阻塞了10个进程
    print(counter.value())     #打印了累加值。

 

        #多进程间共享字符串变量
#encoding=utf-8
from multiprocessing import Process, Manager, Value
from ctypes import c_char_p
 
def greet(shareStr):
    shareStr.value = shareStr.value + ", World!"
 
if __name__ == '__main__':
    manager = Manager()
    shareStr = manager.Value(c_char_p, "Hello")
    process = Process(target = greet, args = (shareStr,))
    process.start()
    process.join()    
    print(shareStr.value)

 

      #多进程间共享不同类型的数据结构对象
#encoding=utf-8
from multiprocessing import Process, Manager
 
def f( shareDict, shareList ):
    shareDict[1] = '1'
    shareDict['2'] = 2
    shareDict[0.25] = None
    shareList.reverse() # 翻转列表
 
if __name__ == '__main__':
    manager = Manager()
    shareDict = manager.dict() # 创建共享的字典类型
    shareList = manager.list( range( 10 ) ) # 创建共享的列表类型
    p = Process( target = f, args = ( shareDict, shareList ) )
    p.start()
    p.join()
    print(shareDict)
    print(shareList)
    进程池的共享队列 #跨进程使用队列
#encoding=utf-8
from multiprocessing import Pool,Manager
def func(q):
    print("*"*10)
    q.put("12346")
 
if __name__ == "__main__":
   manager = Manager()
   q = manager.Queue()#进程池队列
   pool = Pool(processes=4)
   for i in range(5):
       pool.apply_async(func,(q,))
   
   pool.close()
   pool.join()
   print(q.qsize())

 

      进程间共享实例对象
#encoding=utf-8
import time, os
import random
from multiprocessing import Pool, Value, Lock, Manager
from multiprocessing.managers import BaseManager
 
class MyManager(BaseManager): #跨进程共享内存的对象
  pass
 
def Manager():
    m = MyManager()  #实例化一个跨进程的内存空间
    m.start()        #启动
    return m         #返回这个空间对象
 
class Counter(object): #共享计数的类
    def __init__(self, initval=0):
        self.val = Value('i', initval)
        self.lock = Lock()
 
    def increment(self): #通过锁的方式,实现共享数字的累加
        with self.lock:
            self.val.value += 1
 
    def value(self):
        with self.lock:  #通过锁的方式,读取共享数字的值
            return self.val.value
 
#将Counter类注册到Manager管理类中,将类注册到共享空间对象里面
MyManager.register('Counter', Counter)
 
 
def long_time_task(name,counter): #多进程执行的任务
    time.sleep(0.2)
    print('Run task %s (%s)...\n' % (name, os.getpid()))
    start = time.time()
    #time.sleep(random.random() * 3)
    for i in range(50):
        time.sleep(0.01)
        counter.increment() #任务,通过实例对象累加50次
    end = time.time()
    #计算一下当前任务的耗时
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
 
if __name__ == '__main__':
    manager = Manager()
    #创建共享Counter类实例对象的变量,Counter类的初始值0
    counter = manager.Counter(0)
    print('Parent process %s.' % os.getpid())
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args = (str(i), counter))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
    print(counter.value())
共享对象的逻辑: 1 class MyManager(BaseManager): #跨进程共享内存的对象   pass 2 将共享的类注册到共享内存中:   MyManager.register('Counter', Counter) 3 启动共享内存对象,并返回这个对象   def Manager():     m = MyManager()  #实例化一个跨进程的内存空间     m.start()        #启动     return m         #返回这个空间对象   4 创建共享对象的实例:   counter = manager.Counter(0) 5 在多进程中使用跨进程的类实例   p.apply_async(long_time_task, args = (str(i), counter))     总结: 1 进程的本质:运行的程序实例   每个进程使用的内存空间是独立的,所以使用的变量都是不共享。 (不同享:一个进程的变量只能在进程内部访问和使用,不能在进程    外部访问和使用)   2 生成进程的方式:
  •   process-->本质:3个参数:name,task(一个函数),args(函数需要的参数)
            进程使用的方式:                  start                  join  
  •   pool--》参数:进程启动的数量:cpu核数
             进程池执行的几种方式:                     pool.map:task, iterable object                      pool.apply_asyn(name,task,args)                      pool.close()                      pool.join()   3 共享变量(5种)   i   d   str   list、dict   实例   4 同步:   非进程池使用的队列:Multiprocess 的Queue,joianblequeue(task_done())   进程池使用的队列:    manager = Manager()    q = manager.Queue()    wait--e.set(信号)    wait--notify_all(生产者和消费者)condition    pipe(管道)   5 锁:   防止资源竞争。       #进程日志
#encoding=utf-8
import multiprocessing
import logging
import sys
 
def worker():
    print('I am working....')
    sys.stdout.flush()
 
if __name__ == '__main__':
    # 设置日志输出到控制台
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    # 设置输出日志的级别
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target = worker)
    p.start()
    p.join()
C:\Users\dell>py -3 C:\Users\dell\Desktop\练习\5\0518.py
[INFO/Process-1] child process calling self.run()
I am working....
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

 

    #守护进程:让子进程跟主进程共存亡
#encoding=utf-8
import multiprocessing
import time, logging
import sys
 
def daemon():  #demon进程的任务
  p = multiprocessing.current_process()
  print('Starting:', p.name, p.pid)
  sys.stdout.flush() # 将缓冲区数据写入终端
  time.sleep(2)
  print('Exiting :', p.name, p.pid)
  sys.stdout.flush()
 
def non_daemon():  #非demon进程的任务
  p = multiprocessing.current_process()
  print('Starting:', p.name, p.pid)
  sys.stdout.flush()
  time.sleep(2)
  print('Exiting :', p.name, p.pid)
  sys.stdout.flush()
 
if __name__ == '__main__':
  # 设置日志输出到控制台
  multiprocessing.log_to_stderr()
  logger = multiprocessing.get_logger()
  # 设置输出日志的级别
  logger.setLevel(logging.DEBUG)
 
  d = multiprocessing.Process(name='daemon', target=daemon)
  d.daemon = True
  n = multiprocessing.Process(name='non-daemon', target=non_daemon)
  n.daemon = False
  d.start()
  time.sleep(1)
  n.start()
  #d.join()  #加join可以让他们都执行完在退出
  #n.join()
  print('d.is_alive()', d.is_alive())
  print("n.is_alive()", n.is_alive())
  print("main Process end!")

 

    #subprocess模块 #标准输出
import subprocess
#默认参数shell=Fasle,必须使用列表方式来设定命令和参数,例如:[命令,参数1,参数2]
obj = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
obj.stdin.write(b"print(1);\n")
obj.stdin.write(b"print(2);\n")
obj.stdin.write(b"print(3);\n")
obj.stdin.write(b"print(4);\n")
obj.stdin.close()
 
cmd_out = obj.stdout.read()
obj.stdout.close()
 
cmd_error = obj.stderr.read()
obj.stderr.close()
 
print(cmd_out)
print(cmd_error)

 

      #进程通讯实例:批量执行系统命令
打开一个只有ip地址的文本文件,读取其中的ip或域名,然后进行ping操作, 并将ping结果写入ping.txt文件中。ip.txt文件内容如下: www.baidu.com www.taobao.com 123.45.5.34 127.0.0.1
import subprocess
import os
class Shell(object) :
  def runCmd(self, cmd) :
    res = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # 获取子进程的标准输出,标准错误信息
    sout, serr = res.communicate()
    #sout:执行命令后的输出内容,serr出错内容,res.pid为进程编号
    return res.returncode, sout, serr, res.pid
 
shell = Shell()
fp = open('c:\\test\\ip.txt', 'r')
ipList = fp.readlines()
fp.close()
fp = open('c:\\test\\ping.txt', 'a')
print(ipList)
for i in ipList :
  i = i.strip()
  result = shell.runCmd('ping ' + i)
  if result[0] == 0 :
    w = i + ' : 0'
    fp.write(w + '\n')
  else :
    w = i + ' : 1'
    fp.write(w + '\n')
  print( result[1].decode("gbk"))
fp.close()

 

       

标签:__,Process,python,self,笔记,print,实例,进程,multiprocessing
来源: https://www.cnblogs.com/wenm1128/p/11835963.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有