ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Python编程学习-基础笔记10

2022-08-02 18:02:26  阅读:140  来源: 互联网

标签:__ 10 name Python 编程 线程 sleep print import


十二、多任务

12.1 进程

'''
并发和并行:
    并发:当有多个线程在操作时,系统只有一个CPU,并不能真正实现多任务同时进行,它是把CPU划分成若干个时间段,任务顺序执行
    并行:当系统有2个及以上CPU,则线程操作有可能并发,当一个CPU 执行一个线程,另一个CPU执行另一个线程,互不抢占资源

实现多任务的方式:
    多进程模式 : 进程是程序的实体,对操作系统来说,一个任务就是一个进程。如打开2个记事本就启动2个进程
        优点:稳定性高,一个进程崩溃了,不会影响其他进程
        缺点:创建进程开销大,操作系统同时运行进程的数量有限
        进程创建: linux下使用os模块fork函数来创建,windows下使用 multiprocessing模块中的process类来创建
        from multiprocessing import Process
        process = Process(target=函数, name= 进程的名字,args=(给函数传递的参数))
        process 对象
        对象调用方法:
            process.start() 启动进程并执行任务
            process.run() 只启动了进程,没有执行任务
            terminate() 终止

    多线程模式
    协程
    进程 --> 线程 --> 协程
'''

#进程创建
import os
from multiprocessing import Process
from time import  sleep
def task1():
    while True:
        sleep(1)
        print('This is task1', os.getpid(),'----->',os.getppid())

def task2():
    while True:
        sleep(1)
        print('This is task2', os.getpid(),'----->',os.getppid())

if __name__ == '__main__':
    print(os.getpid())
    #子进程1
    p = Process(target=task1,name='任务1')
    p.start()
    print(p.name)
    #子进程2
    p1 = Process(target=task2,name='任务2')
    p1.start()
    print(p1.name)

带参数

#进程创建
import os
from multiprocessing import Process
from time import  sleep
def task1(s,name):
    while True:
        sleep(s)
        print('This is task1', os.getpid(),'----->',os.getppid(),name)

def task2(s,name):
    while True:
        sleep(s)
        print('This is task2', os.getpid(),'----->',os.getppid(),name)
number = 1
if __name__ == '__main__':
    print(os.getpid())
    p = Process(target=task1,name='任务1',args=(1,'测试任务1'))
    p.start()
    print(p.name)
    p1 = Process(target=task2,name='任务2',args=(1,'测试任务2'))
    p1.start()
    print(p1.name)
    while True:
        number += 1
        sleep(0.2)
        if number == 100:
            p.terminate()
            p1.terminate()
            break
        else:
            print(number)

    print('************')

12.2 多进程对全局变量的访问

'''
多进程对全局变量的访问:在每一个全局变量里都放一个m变量,保证每个进程访问变量互不干扰
每个进程都独立获得一份全局变量,互不影响各自的修改
'''

#进程创建
import os
from multiprocessing import Process
from time import  sleep

m= 1 #不可变类型
list1= [] #可变类型
def task1(s,name):
    global m
    while True:
        m+=1
        list1.append(str(m)+'task1')
        sleep(s)
        print('This is task1---->',list1) #This is task1----> ['2task1', '3task1', '4task1', '5task1', '6task1', '7task1']

def task2(s,name):
    global m
    while True:
        m+=1
        list1.append(str(m)+'task2') #This is task2----> ['2task2', '3task2', '4task2', '5task2', '6task2', '7task2']
        sleep(s)
        print('This is task2---->',list1)
number = 1
if __name__ == '__main__':
    print(os.getpid())
    p = Process(target=task1,name='任务1',args=(1,'测试任务1'))
    p.start()
    p1 = Process(target=task2,name='任务2',args=(1,'测试任务2'))
    p1.start()
    while True:
        m += 1
        list1.append((str(m)+'main'))
        sleep(0.5)
        print('This is main:---->', list1) #This is main:----> ['2main', '3main', '4main', '5main', '6main', '7main', '8main', '9main', '10main', '11main', '12main', '13main', '14main']

12.3 自定义进程

#进程:自定义
import os
from multiprocessing import Process
from time import  sleep

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name
    #重新run方法
    def run(self):
        n = 1
        while True:
            print(f'{self.name}---> 自定义进程,n:{n}')
            n += 1

if __name__ == '__main__':
    p = MyProcess('测试任务1')
    p.start() # 1, 先开一个新的进程,2,调用run()方法
    p1 = MyProcess('测试任务2')
    p1.start()

12.4 进程池的非阻塞/阻塞模式

非阻塞

'''
使用multiprocessing模块提供的pool方法:
    初始化pool时可以指定一个最大进程数,当有新的请求提交到pool中时,如果池子还没满,就会创建一个新的进程来执行该请求
    如果pool中进程数已达到指定的最大值,那么该请求就会等待,知道pool中有进程结束,才会创建新的进程来执行

非阻塞式: 全部添加到队列,立刻返回,并没有等待其它进程执行完毕才会结束。但是回调函数,等待任务完成后才去调用
阻塞式:
'''
import os
import time
from multiprocessing import Pool
from random import random
from time import  sleep

#非阻塞
def task(task_name):
    print('开始做任务:',task_name)
    start = time.time()
    #使用sleep
    time.sleep(random()*2) #2s以内
    end = time.time()
    # print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid())
    return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()
container = []
def callback_func(n):
    container.append(n)

if __name__ == '__main__':
    pool = Pool(5)
    tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步']
    for k in tasks:
        pool.apply_async(task,args=(k,),callback=callback_func)
    pool.close() # 添加任务结束
    pool.join() #任务结束前,阻止回到主进程
    for c in container:
        print(c)

    print('over!!')

阻塞

'''
阻塞式:顺序执行,添加一个任务,执行一个任务,上一个任务不执行完成,下一个任务不会开始

进程池:
    pool = Pool(max) 创建进程池对象
    pool.apply()    阻塞式
    pool.apply_async() 非阻塞式
    pool.close() 添加任务结束
    pool.join() 让主进程让步,插队
'''
import os
import time
from multiprocessing import Pool
from random import random

#非阻塞
def task(task_name):
    print('开始做任务:',task_name)
    start = time.time()
    #使用sleep
    time.sleep(random()*2) #2s以内
    end = time.time()
    print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid())
    # return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()
# container = []
# def callback_func(n):
#     container.append(n)

if __name__ == '__main__':
    pool = Pool(5)
    tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步']
    for k in tasks:
        pool.apply(task,args=(k,)) #阻塞式
    pool.close() # 添加任务结束
    pool.join() #任务结束前,阻止回到主进程

    print('over!!!!')

队列

from multiprocessing import Queue
q = Queue(5) #设置队列长度为5

q.put('A')
q.put('B')
q.put('C')
q.put('D',timeout=2) #加timeout,超时不等
q.put('E') #put()如果队列满了,就只能等待,除非有空地,则添加成功
print(q.qsize())
if not q.full(): #q.full()判断队列是否已满,q.empty()判断队列是否是空的
    q.put('F')
else:
    print('Queue is full')
#获取队列的值
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
# print(q.get(timeout=2))
#不等待,这样就不阻塞
q.put_nowait('G')
print(q.get_nowait())

12.5 进程间的通讯

'''
进程间通讯:
    通过使用公共对象q = Queue(5),保证任务在同一个队列里面

'''
from multiprocessing import Process,Queue
from time import sleep

#将队列q作为对象传递
def download(q):
    images = ['girl.jpg','boy.jpg','lady.jpg']
    for image in images:
        print("Downloading:",image)
        sleep(0.5)
        q.put(image)

def getfile(q):
    while True:
        try:
            file = q.get(timeout=2) #超时2秒无内容则不去取
            print(f'{file} save success!!')
        except:
            print('All file downloaded successfully!!')
            break


if __name__ == '__main__':
    q = Queue(5)
    #q最为参数
    p1 = Process(target=download,args=(q,))
    p2 = Process(target=getfile,args=(q,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    print('验证任务完成,回到主线程!')

12.6 多线程

线程状态:

'''
多线程:
    进程 :是一个正在执行中的程序,每一个进程执行都有一个执行顺序,该顺序是一个执行路径,或者叫一个控制单元;
    线程:就是进程中的一个独立控制单元,线程在控制着进程的执行。一个进程中至少有一个进程。
    多线程:一个进程中不只有一个线程。
如何创建和使用线程:
    调用threading函数来创建线程对象
    线程的状态:新建 --> 就绪 --> 运行 --> 结束
                               |
                               阻塞
'''

import threading

from time import sleep
#进程 Process
#线程 Thread

def download(n):
    images = ['girl.jpg','boy.jpg','lady.jpg']
    for image in images:
        print("Downloading:",image)
        sleep(n)
        print(f'{image}保存成功!!')

def listenMusic(n):
    musics = ['My heart will go on','My love','吻别','梦回唐朝']
    for music in musics:
        sleep(n)
        print(f'正在听{music}!!')

if __name__ == '__main__':
    #创建线程对象
    t = threading.Thread(target=download,name='aa',args=(1,))
    t.start()
    t1 = threading.Thread(target=listenMusic,name='bb',args=(1,))
    t1.start()
    # n = 1
    # while True:
    #     print(n)
    #     n+=1
    #     sleep(1.5)

12.6.1 多线程共享全局变量

'''
线程可以共享全局变量
GIL 全局解释器锁
    python底层只要用线程就默认加锁 GIL
    线程异步 --> 共享数据会导致数据不安全
    线程同步 --> 导致的是效率低,但数据安全
    运算数据量大时,会自动释放锁GIL
线程:耗时操作时用,爬虫,IO
进程:计算密集型
'''
import threading

from time import sleep
#进程 Process
#线程 Thread
money = 1000
def run1():
    global money
    for i in range(100):
        sleep(0.1)
        money -= 1
# def run2():
#     global money
#     for i in range(100):
#         money -= 1

if __name__ == '__main__':
    #创建线程对象 4个线程
    t = threading.Thread(target=run1,name='th1')
    t1 = threading.Thread(target=run1,name='th2')
    t2 = threading.Thread(target=run1,name='th3')
    t3 = threading.Thread(target=run1,name='th4')
    t.start()
    t1.start()
    t2.start()
    t3.start()
    #阻塞
    t.join()
    t1.join()
    t2.join()
    t3.join()
    print('money:',money)  #600, 共享同一个全局变量

当计算大数据量时,自动释放锁

import threading

from time import sleep
#进程 Process
#线程 Thread
n = 0
def task1():
    global n
    for i in range(1000000):
        n += 1
    print('------->task1:',n)
    print('------------')
    sleep(1)

def task2():
    global n
    for i in range(1000000):
        n += 1
    print('------->task2:',n)
    print('------------')
    sleep(1)
if __name__ == '__main__':
    #创建线程对象 4个线程
    t = threading.Thread(target=task1,name='th1')
    t1 = threading.Thread(target=task2,name='th3')
    t.start()
    t1.start()

    #阻塞
    t.join()
    t1.join()
    print('over')
#运行结果
------->task1: 1000000
------------
------->task2: 1647975 # 释放了锁
------------
over

12.7 多线程同步

'''
共享数据存在不安全性:
如果多个线程同时对数据进行修改,则可能出现不可预料的结果,为了保证数据的正确性,需要的对多个线程进行同步
同步:一个接一个的来做任务,一个做完,另一个才能进来,效率降低
使用Thread对象的lock和Rlock可以实现简单的线程同步,这2个对象都有acquire和release 方法,对应那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release之间。
多线程的优势在于可以同时运行多个任务,但是当线程需要共享数据时,可能存在数据不同步的问题,为了避免这种情况,引入了锁的概念。
lock = threading.Lock()
    lock.acquire() #请求得到锁
    ........
    lock.release() #释放锁
只要不释放锁,其他线程都无法进入运行状态
'''
import threading
from time import sleep
import random
#加锁
lock = threading.Lock()

list1 = [0] *10  #列表里有10个0

def task1():
    #获取线程锁,如果已经上锁,则等待锁的释放
    lock.acquire() #阻塞
    for i in range(len(list1)):
        list1[i] = 1
        sleep(0.5)
    lock.release() #释放

def task2():
    #获取线程锁,如果已经上锁,则等待锁的释放
    lock.acquire() #阻塞
    for i in range(len(list1)):
        print('--->',list1[i])
        sleep(0.5)
    lock.release() #释放

if __name__ == '__main__':
    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)
    t2.start()
    t1.start()
    t2.join()
    t1.join()
    print(list1)

12.8 死锁

'''
死锁:
开发过程中使用线程,在线程共享多个资源时,如果2个线程分别占有一部分资源,并且同时等待对方的资源,就会造成死锁
尽管死锁很少发生,但一旦发生就会造成应用的停止响应,程序不做任何事情
避免出现死锁的方法:资源分配不当导致死锁
    1,重构代码
    2,acquire() 加timeout,利用时间差来释放锁
'''
from threading import Thread,Lock
from time import sleep

lockA = Lock()
lockB = Lock()

class MyThread1(Thread):
    def run(self):#start
        if lockA.acquire():#如果可以获取到锁,则返回True
            print(self.name + '获取到了A锁')
            sleep(0.2)
            if lockB.acquire(timeout=4): #阻塞
                print(self.name + '又试图获取到了B锁,原来还有A锁的存在') #因为有sleep,A锁已经被线程1占有
                lockB.release()
            lockA.release()

class MyThread2(Thread):
    def run(self):#start
        if lockB.acquire():#如果可以获取到锁,则返回True
            print(self.name + '获取到了B锁')
            sleep(0.2)
            if lockA.acquire(timeout=2):#阻塞
                print(self.name + '又试图获取到了A锁,原来还有B锁的存在') #因为有sleep,A锁已经被线程1占有
                lockA.release()
            lockB.release()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()

    t1.start()
    t2.start()
#运行结果
Thread-1获取到了A锁
Thread-2获取到了B锁
Thread-1又试图获取到了B锁,原来还有A锁的存在

12.9 生产者与消费者

'''
生产者与消费者:两个线程之间的通讯
python的queue模块中提供了同步的,线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和
优先级队列PriorityQueue,这些都实现了锁原理(可以理解为原子操作,要么不做,要么做完)。能够在多线程中直接使用。
可以使用队列来实现线程间的同步。
'''

import threading
import queue
import time
import random
def produce(q):
    i=0
    while i<10:
        num = random.randint(1,100)
        q.put('生产者生产数据:%d'%num)
        print('生产者生产数据:%d'%num)
        time.sleep(1)
        i+=1
    q.put(None)
    #完成任务
    q.task_done()

def consume(q):
    while True:
        item = q.get()
        arr.append(item)
        if item is None:
            break
        print(f'消费者获取到:{arr}')
        time.sleep(4)

    #完成任务
    q.task_done()

if __name__ == '__main__':
    q = queue.Queue(10)
    arr = []

    #创建生产者
    th = threading.Thread(target=produce,args=(q,))
    th.start()

    #创建消费者
    tc = threading.Thread(target=consume,args=(q,))
    tc.start()

    th.join()
    tc.join()
    print('over!!')

12.10 协程

使用生成器来完成协程任务

'''
协程:微线程
进程 --》线程 --》 协程
耗时操作--> 用协程
    网络请求
    网络下载(下载)
    IO 操作:文件读写
    阻塞动作
生成器:yield
'''
import time

def task1():
    for i in range(3):
        print('A'+ str(i))
        yield
        time.sleep(0.1)
def task2():
    for i in range(3):
        print('B'+ str(i))
        yield
        time.sleep(0.1)

if __name__ == '__main__':
    g1 = task1()
    g2 = task1()
    while True:
        try:
            next(g1)
            next(g2)
        except:
            break

使用greenlet来完成协程任务

'''
greenlet : 完成协程任务
'''
import time
from greenlet import greenlet

def task1():
    for i in range(3):
        print('A'+ str(i))
        g2.switch()
        time.sleep(0.1)
def task2():
    for i in range(3):
        print('B'+ str(i))
        g3.switch()
        time.sleep(0.1)

def task3():
    for i in range(3):
        print('C'+ str(i))
        g1.switch()
        time.sleep(0.1)
if __name__ == '__main__':
    g1 = greenlet(task1)
    g2 = greenlet(task2)
    g3 = greenlet(task3)

    g1.switch()

使用gevent来完成协程任务

'''
gevent:
    greenlet已经实现了协程任务,但是需要人工切换,不是很智能
    gevent 完美解决greenlet的不足
原理:
    当一个greenlet遇到IO(input output,比如网络,文件操作等)操作时,就自动切换到其他greenlet,等到IO完成,再切回来继续执行
    由于IO 操作非常耗时,经常使程序处于等待状态,有了gevent就可以自动完成切换协程,保证了总有greenlet在运行,而不是等待IO

'''
import time
from greenlet import greenlet
import gevent
from gevent import monkey
#猴子补丁,替换了sleep的动作,不然就是单个函数顺序执行
monkey.patch_all()

def task1():
    for i in range(3):
        print('A'+ str(i))
        time.sleep(0.1)
def task2():
    for i in range(3):
        print('B'+ str(i))
        time.sleep(0.1)

def task3():
    for i in range(3):
        print('C'+ str(i))
        time.sleep(0.1)
if __name__ == '__main__':
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    g3 = gevent.spawn(task3)

    g1.join()
    g2.join()
    g2.join()

    print('---------')

案例

import time,requests
import gevent
from gevent import monkey
import urllib.request
#猴子补丁,检测到耗时操作则切换
monkey.patch_all()

def download(url):
    # response = requests.get(url)
    # content = response.text
    response = urllib.request.urlopen(url)
    content = response.read()
    print(f'下载了{url}的数据,长度{len(content)}')


if __name__ == '__main__':
    urls = ['http://www.baidu.com','http://mail.126.com','http://cn.bing.com/?mkt=zh-CN',]
    g1 = gevent.spawn(download,urls[0])
    g2 = gevent.spawn(download,urls[1])
    g3 = gevent.spawn(download,urls[2])

    g1.join()
    g2.join()
    g3.join()

    print('---------')

标签:__,10,name,Python,编程,线程,sleep,print,import
来源: https://www.cnblogs.com/orange2016/p/16544673.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有