ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

Python基础6(进程 线程 协程)

2021-02-25 10:05:22  阅读:112  来源: 互联网

标签:__ 协程 name Python 进程 线程 time print import


进程的基础

进程的基础

程序

​ 一堆静态的代码文件

进程

​ 一个正在运行的程序进程, 抽象的概念

​ 由操作系统操控调用交于CPU运行 被CPU运行

操作系统

操作系统的基础

​ 管理控制协调计算机中硬件与软件的关系

操作系统的作用

​ 将一些对硬件操作的复杂丑陋的接口, 变成简单美丽的接口

​ 多个进程抢占一个(CPU)资源时, 从操作系统会将你的执行变得合理有序

tip:

​ 阻塞: input read write sleep recv accept sendto recvfrom

操作系统的发展史

​ 最早出现的计算机: 算盘

电子类的计算机发展史:

第一代计算机(1940~1955)

​ 先连接调配各个硬件, 1.5小时, 真空管, 然后在插上程序调试. 效率低.

优点

​ 个人独享整个计算机资源

缺点

​ 硬件调试插线, 耗时

​ 所有人都是串行执行

第二代计算机(1955~1965)

优点

​ 程序员不用亲自对硬件进行插线操控, 效率提高

​ 可以进行批量处理代码

缺点

​ 程序员不能独自使用计算机

​ 你的所有程序还是串行

第三代计算机(1965~1980)

​ 集成电路, 多到程序系统

大背景

​ 集成电路: 把所用的硬件变小, 线路板.

​ 将两套不同的生产线合并成一条生产线

技术上的更新(多道技术)
空间上的复用***

​ 将内存分领域, 一个内存可以同时加载多个进程

时间上的复用***

​ 实现将CPU在多个进程之间来回切换, 并且保留状态

IO阻塞

​ 几乎所有的程序都有IO阻塞

​ 同时加载到内存的三个任务, 三个进程, 每个进程都有阻塞情况, 只要 CPU 运行一个进程时, 遇到IO阻塞立马会切换, 长时间占用 CPU 也会切换

​ 提升效率, 最大限度的使用 CPU. 如果是一个IO密集型进程, 来回切换提升效率, 如果是一个计算密集型, 来回切换降低效率.

特点

​ 第三代计算机广泛采用了必须的保护硬件(程序之间的内存彼此隔离)之后, 第三代计算机应用而生

​ 每个人占用计算机的时间有限, 多人(少于10个) 共同使用一个计算机主机

第四代计算机(1980~至今)

进程的理论

​ 串行: 所有的任务一个一个的完成.

​ 并发: 一个CPU完成多个任务. 看起来像是同时完成.

​ 并行: 多个CPU执行多个任务, 真正的同时完成.

​ 阻塞: CPU遇到IO就是阻塞

​ 非阻塞: 没有IO, 就叫非阻塞

进程的有关问题

程序:

​ 一堆静态文件

​ 一个正在执行的程序任务, 一个进程

​ 一个程序能否开启多个进程? 可以

进程的创建

​ 一个子进程必须依赖于一个主进程才可以开启, 一个主进程可以开启多个子进程

unix: fork       创建子进程.
unix(linux,mac)  创建一个子进程会完完全全复制一
个主进程所有的资源,初始资源不变.
windows操作系统调用CreateProcess 处理进程的创建.
windows创建一个子进程,会copy主进程所有的资源,但是会改变一些资源.

进程的状态

 

 

 

 

进程创建的两种方式

第一种方法:

from multiprocessing import Process
import time


def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")


if __name__ == '__main__':
    # 在windows环境下, 开启进程必须在__name__ == '__main__'下面
    p = Process(target=task, args=("常鑫",)) # 创建一个进程对象
    p.start() # 只是向操作系统发出一个开辟子进程的信号,然后就执行下一行了
    # 这个信号操作系统接收到之后, 会从内存中开辟一个子进程空间, 然后再将主进程所有数据copy加载到子进程, 然后再调用CPU去执行
    # 开辟子进程开销是很大的
    print("==主开始")
    
"""
==主开始
常鑫 is running
常鑫 is gone
"""

 

第二种方法:

from multiprocessing import Process
import time

class MyProcess(Process):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name} is running")
        time.sleep(2)
        print(f"{self.name} is gone")

if __name__ == '__main__':
    p = MyProcess('常鑫')
    p.start()
    print('===主程序')
    
"""
===主程序
常鑫 is running
常鑫 is gone
"""

 

简单应用

from multiprocessing import Process
import time


def task(name):
    print(f"{name} is running")
    time.sleep(1)
    print(f"{name} is gone")


def task1(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")


def task2(name):
    print(f"{name} is running")
    time.sleep(3)
    print(f"{name} is gone")

    
    
# 一个进程串行的执行三个任务
if __name__ == '__main__':
    start_time = time.time()
    task("常鑫")
    task1("李远点业")
    task2("海狗")
    print(f"结束时间{time.time() - start_time}")
    
"""
常鑫 is running
常鑫 is gone
李远点业 is running
李远点业 is gone
海狗 is running
海狗 is gone
结束时间6.001969337463379
"""

# 三个进程 并发或者并行的执行三个任务
if __name__ == '__main__':
    p1 = Process(target=task, args=("常鑫",))
    p2 = Process(target=task1, args=("李远点业",))
    p3 = Process(target=task2, args=("海狗",))
    start_time = time.time()
    p1.start()
    p2.start()
    p3.start()
    time.sleep(5)
    print(f"结束时间{time.time() - start_time}")
    
"""
常鑫 is running
李远点业 is running
海狗 is running
常鑫 is gone
李远点业 is gone
海狗 is gone
结束时间5.026661396026611
"""

 

进程pid

获取进程pid

import os
import time
print(f'子进程:{os.getpid()}')
print(f'主进程:{os.getppid()}')
time.sleep(50)

# 第一次执行
"""
子进程:139000
主进程:109036
"""

# 第二次执行
"""
子进程:136820
主进程:109036
"""

# 第三次执行
"""
子进程:139916
主进程:109036
"""
from multiprocessing import Process
import os

def task(name):
    print(f"子进程{os.getpid()}")
    print(f"主进程{os.getppid()}")

if __name__ == '__main__':
    p = Process(target=task, args=("常鑫",))
    p.start()
    print(f"===={os.getpid()}")
    
"""
====142800
子进程143244
主进程142800
"""
# 创建的进程的主进程就是 本模块此进程

 

验证进程之间的空间隔离

from multiprocessing import Process
import time
name = '太白'

def task():
    global name
    name = "刚子sb"
    print(f"子进程:{name}")

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    time.sleep(3)
    print(f"主:{name}")
"""
子进程:刚子sb
主:太白
"""
# 子进程的name变量即使有global也没有重新赋值, 因为进程之间的空间隔离

lst = ["丽丽", ]

from multiprocessing import Process
import time

def task():
    lst.append('怼姐')
    print(f"子进程{lst}")

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    time.sleep(3)
    print(f"主:{lst}")
"""
子进程['丽丽', '怼姐']
主:['丽丽']
"""
# 即使是列表也有空间隔离

 

进程对象join方法

单个子进程使用join

join让主进程等待子进程结束之后, 在执行主进程

from multiprocessing import Process
import time

def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")

if __name__ == '__main__':
    p = Process(target=task, args=("常鑫", ))
    p.start()
    p.join()
    print("==主进程")
    
"""
常鑫 is running
常鑫 is gone
==主进程
"""

 

多个子进程使用join

验证一:

from multiprocessing import Process
import time

def task(name,sec):
    print(f"{name} is running")
    time.sleep(sec)
    print(f"{name} is gone")

if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task, args=("常鑫", 1))
    p2 = Process(target=task, args=("李业", 2))
    p3 = Process(target=task, args=("海狗", 3))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    print(f"==主进程{time.time() - start_time}")
    
"""
常鑫 is running
李业 is running
海狗 is running
常鑫 is gone
李业 is gone
海狗 is gone
==主进程:4.285647392272949
"""

 

验证2:

from multiprocessing import Process
import time

def task(name,sec):
    print(f"{name} is running")
    time.sleep(sec)
    print(f"{name} is gone")

if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task, args=("常鑫", 1))
    p2 = Process(target=task, args=("李业", 2))
    p3 = Process(target=task, args=("海狗", 3))
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    print(f"==主进程1:{time.time() - start_time}")
    p2.join()
    print(f"==主进程2:{time.time() - start_time}")
    p3.join()
    print(f"==主进程3:{time.time() - start_time}")
    
"""
常鑫 is running
李业 is running
海狗 is running
常鑫 is gone
==主进程1:2.2231969833374023
李业 is gone
==主进程2:3.232954502105713
海狗 is gone
==主进程3:4.242786645889282
"""

# start同时开启三个进程, 同时运行
# join就是阻塞

 

优化验证2:

from multiprocessing import Process
import time


def task(sec):
    print(f"子进程 is running")
    time.sleep(sec)
    print(f"子进程 is gone")


if __name__ == '__main__':
    start_time = time.time()
    l1 = []
    for i in range(1, 4):
        p = Process(target=task, args=(i,))
        l1.append(p)
        p.start()
    for i in l1:
        i.join()
    print(f"==主进程{time.time() - start_time})")
    
"""
子进程 is running
子进程 is running
子进程 is running
子进程 is gone
子进程 is gone
子进程 is gone
==主进程4.1986918449401855)
"""
# join就是阻塞, 主进程有join, 主进程下面的代码一律不执行, 直到子进程执行完毕之后再执行

 

进程的其他参数

terminate(杀死)

from multiprocessing import Process
import time

def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")

if __name__ == '__main__':
    p = Process(target=task, args=("常鑫", ), name="alex")
    p.start()
    p.terminate()
    print("==主进程")

"""
==主进程
"""
# 子进程刚发出信号就杀死

from multiprocessing import Process
import time

def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")

if __name__ == '__main__':
    p = Process(target=task, args=("常鑫", ), name="alex")
    p.start()
    time.sleep(1)
    p.terminate()
    print("==主进程")

"""
常鑫 is running
==主进程
"""
# 睡了一秒所以子进程start了

 

is_alive(判断子进程是否存活)

from multiprocessing import Process
import time

def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")

if __name__ == '__main__':
    p = Process(target=task, args=("常鑫", ), name="alex")
    p.start()
    # p.terminate()
    # p.join()
    print(p.is_alive())
    print("==主进程")
    
"""
True
==主进程
常鑫 is running
常鑫 is gone
"""

# terminate 和 join 都会使进程结束, 所以有terminate 和 join 就会返回 False

 

守护进程

子进程守护着主进程, 只要主进程结束, 子进程跟着就结束

from multiprocessing import Process
import time

def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")

if __name__ == '__main__':
    p = Process(target=task, args=("常鑫", ))
    p.daemon = True
    p.start()
    print("==主")
"""
常鑫 is running
常鑫 is gone
==主
"""

 



僵尸进程与孤儿进程

基于unix环境(linux, macOS)

​ 主进程需要等待子进程结束之后, 主进程才结束

​ 主进程时刻监测子进程的运行状态, 当子进程结束之后, 一段时间之内, 将子进程进行回收

为什么主进程不在子进程结束后马上对其回收呢?

​ 主进程与子进程是异步关系, 主进程无法马上捕获子进程什么时候结束

​ 如果子进程结束之后, 马上在内存中释放资源, 主进程就没有办法监测子进程的状态了

unix针对于上面的问题, 提供了一个机制

​ 所有的子进程结束之后, 立马会释放掉文件的操作链接, 内存的大部分数据, 但是会保留一些内容: 进程号, 结束时间, 运行状态. 等待主进程监测, 回收.

僵尸进程

​ 所有的子进程结束之后, 在被主进程回收之前, 都会进入僵尸进程状态

僵尸进程有无危害

​ 如果父进程不对僵尸进程进行回收(wait/waitpid), 产生大量的僵尸进程, 这样就会占用内存, 占用进程pid号

僵尸进程如何解决

​ 父进程产生了大量子进程, 但是不回收, 这样就会形成大量的僵尸进程, 解决方式就是直接杀死父进程, 将所有的僵尸进程变成孤儿进程, 由init进行回收

孤儿进程

​ 父进程由于某种原因结束了, 但是你的子进程还在运行中, 这样你的这些子进程就成了孤儿进程, 你的父进程如果结束了, 你的所有的孤儿进程就会被init进程的回收, init就变成了你的父进程, 对你进行回收

互斥锁

​ 三个同事, 同时用一个打印机打印内容.

​ 三个进程模拟三个同事, 输出平台模拟打印机.

​ 互斥锁:

​ 可以公平性的保证顺序以及数据的安全

# 版本一
from multiprocessing import Process
import time
import random
import os


def task1():
    print(f"{os.getpid()}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()}打印结束了")


def task2():
    print(f"{os.getpid()}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()}打印结束了")


def task3():
    print(f"{os.getpid()}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()}打印结束了")


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)

    p1.start()
    p2.start()
    p3.start()

"""
188412开始打印了
188408开始打印了
188388开始打印了
188412打印结束了
188388打印结束了
188408打印结束了
"""

# 现在是所有的进程都并发的抢占打印机,
# 并发是以效率优先的, 但是目前我们的需求: 顺序优先
# 多个进程共抢一个资源时, 要保证顺序优先: 串行, 一个一个来

 

# 版本二
from multiprocessing import Process
import time
import random
import os


def task1():
    print(f"{os.getpid()}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()}打印结束了")


def task2():
    print(f"{os.getpid()}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()}打印结束了")


def task3():
    print(f"{os.getpid()}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()}打印结束了")


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)

    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()

"""
160876开始打印了
160876打印结束了
188264开始打印了
188264打印结束了
188328开始打印了
188328打印结束了
"""

# 我们利用join, 解决串行的问题, 保证了顺序优先, 但是这个谁先谁后是固定的.
# 这样不合理, 你在争抢同一个资源的时候, 应该是先到先得, 保证公平

 

# 版本三
from multiprocessing import Process
from multiprocessing import Lock
import time
import random


def task1(p, lock):
    lock.acquire()
    print(f"{p}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{p}打印结束了")
    lock.release()

def task2(p, lock):
    lock.acquire()
    print(f"{p}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{p}打印结束了")
    lock.release()


def task3(p, lock):
    lock.acquire()
    print(f"{p}开始打印了")
    time.sleep(random.randint(1, 3))
    print(f"{p}打印结束了")
    lock.release()


if __name__ == '__main__':

    mutex = Lock()
    p1 = Process(target=task1, args=("p1", mutex))
    p2 = Process(target=task2, args=("p2", mutex))
    p3 = Process(target=task3, args=("p3", mutex))

    p1.start()
    p2.start()
    p3.start()

"""
p1开始打印了
p1打印结束了
p2开始打印了
p2打印结束了
p3开始打印了
p3打印结束了
"""

 

 

lock 与 join 的区别

共同点

​ 都可以把并发变成串行, 保证了顺序

不同点

​ join 设定顺序, lock 让其争抢顺序, 保证了公平性

进程之间的通信

​ 进程在内存级别是隔离的, 但是文件在磁盘上

1.基于文件通信

​ 抢票系统:

# 抢票系统
# 1. 先可以查票, 查询余票数.    并发
# 2. 进行购买, 向服务端发送请求, 服务端接收请求, 在后端将票数-1, 返回到前端           串行
from multiprocessing import Process
import json
import time
import os
import random


def search():
    time.sleep(random.randint(1, 3))   # 模拟网络延迟(查询环节)
    with open("ticket.json", "r", encoding="utf-8")as f:
        dic = json.load(f)
        print(f"{os.getpid()}查看了票数, 剩余{dic['count']}")


def paid():
    with open("ticket.json", "r", encoding="utf-8")as f:
        dic = json.load(f)
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(random.randint(1, 3))
            with open("ticket.json", "w",  encoding="utf-8")as f1:
                json.dump(dic, f1)
            print(f"{os.getpid()} 购买成功")


def task():
    search()
    paid()
    
if __name__ == '__main__':

    for i in range(6):
        p = Process(target=task)
        p.start()

# 当多个进程共抢一个数据时, 如果要保证数据的安全, 必须要串行.
# 要想让购买环节进行串行, 我们必须要加锁处理
from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import os
import random


def search():
    time.sleep(random.randint(1, 3))
    with open("ticket.json", encoding="utf-8")as f:
        dic = json.load(f)
        print(f"{os.getpid()} 查看了票数, 剩余{dic['count']}")


def paid():
    with open("ticket.json", encoding="utf-8")as f:
        dic = json.load(f)
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(random.randint(1, 3))
        with open("ticket.json", "w", encoding="utf-8")as f1:
            json.dump(dic, f1)
        print(f"{os.getpid()}购买成功")


def task(lock):
    search()
    lock.acquire()
    paid()
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(6):
        p = Process(target=task, args=(mutex,))
        p.start()
"""
203496 查看了票数, 剩余3
203512 查看了票数, 剩余3
203496购买成功
203504 查看了票数, 剩余2
203480 查看了票数, 剩余2
203488 查看了票数, 剩余2
203520 查看了票数, 剩余2
203512购买成功
203504购买成功
"""

# 当很多进程共抢一个资源(数据)时, 你要保证顺序(数据的安全), 一定要串行. 
# 互斥锁: 可以公平性的保证顺序以及数据的安全
# 基于文件的进程之间的通信:
        # 效率低
        # 自己加锁麻烦而且很容易出现死锁

 

2.基于队列通信

 

3.基于管道通信

生产者消费者模型

生产者消费者模型

​ 编程思想, 模型, 设计模式, 理论等等, 都是交给你一种编程的方法, 以后遇到类似的情况, 套用即可

生产者消费者模型三要素

​ 生产者: 产生数据的

​ 消费者: 接收数据做进一步处理的

​ 容器: 盆(队列)

队列容器的作用

​ 起到缓冲的作用, 平衡生产力与消费力, 解耦

实例

import time
import random
from multiprocessing import Process, Queue


def producer(q, name):
    for i in range(1, 6):
        time.sleep(random.randint(1, 2))
        res = f"{i}号包子"
        q.put(res)
        print(f"生产者{name} 生产了{res}")


def consumer(q, name):
    while 1:
        try:
            food = q.get(timeout=3)
            time.sleep(random.randint(1, 3))
            print(f"\033[31;0m 消费者{name} 吃了{food} \033[0m")
        except Exception:
            return


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q, "孙宇"))
    p2 = Process(target=consumer, args=(q, "海狗"))

    p1.start()
    p2.start()

"""
生产者孙宇 生产了1号包子
生产者孙宇 生产了2号包子
 消费者海狗 吃了1号包子 
生产者孙宇 生产了3号包子
 消费者海狗 吃了2号包子 
生产者孙宇 生产了4号包子
生产者孙宇 生产了5号包子
 消费者海狗 吃了3号包子 
 消费者海狗 吃了4号包子 
 消费者海狗 吃了5号包子 
"""

 




开启线程的两种方式

进程是资源单位, 线程是执行单位

什么是线程

​ 一条流水线的工作流程.

进程:

​ 在内存中开启一个进程空间, 然后将主进程的所有的资源数据复制一份, 然后调用CPU去执行这些代码

具体描述进程:

​ 在内存中开启一个进程空间, 然后将主进程的所有的资源数据复制一份, 然后调用线程去执行代码

执行流程

​ 主线程子线程没有地位之分, 但是一个主线程在执行, 执行完, 要等待其他非守护子线程执行完之后, 才能结束主线程, 结束本进程

# 第一种方式
from threading import Thread
import time


def task(name):
    print(f"{name} is running")
    time.sleep(1)
    print(f"{name} is gone")


if __name__ == '__main__':

    t1 = Thread(target=task, args=("海狗",))
    t1.start()
    print("===主线程")
    # 线程没有主次之分
"""
李业 is running
===主线程
李业 is gone
"""

 

# 第二种方式
from threading import Thread
import time

class MyThread(Thread):

    def __init__(self, name, l1, s1):
        super(MyThread, self).__init__()
        self.name = name
        self.l1 = l1
        self.s1 = s1

    def run(self):
        print(f"{self.name} is running")
        time.sleep(1)
        print(f"{self.name} is gone")


if __name__ == '__main__':
    t1 = MyThread("李业", [1,2,3], "180")
    t1.start()
    print("===主线程")
"""
李业 is running
===主线程
李业 is gone
"""

 

 

多线程与多进程开启速度区别

from multiprocessing import Process
import time


def work():
    print("hello")


if __name__ == '__main__':
    start_time = time.time()
    lst = []
    for i in range(10):
        t = Process(target=work)
        t.start()
        lst.append(t)
    for i in lst:
        i.join()
    print(time.time() - start_time)
"""
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
1.004307746887207
"""

 

# 多线程
from threading import Thread
import time


def task():
    print("hello")


if __name__ == '__main__':
    start_time = time.time()
    lst = []
    for i in range(10):
        t = Thread(target=task)
        t.start()
        lst.append(t)
    for i in lst:
        i.join()
    print(time.time() - start_time)
"""
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
0.0019998550415039062
"""

 

 

​ 开启进程的开销非常大, 比开启线程的开销大很多.

​ 开启线程的速度非常快, 要快几十倍到上百倍.

线程进程pid

# 进程pid
from multiprocessing import Process
import os

def task():
    print(f"子进程:{os.getpid()}")
    print(f"主进程:{os.getppid()}")

if __name__ == '__main__':
    p1 = Process(target=task)
    p2 = Process(target=task)
    p1.start()
    p2.start()
    print(f"==主:{os.getpid()}")
"""
==主:51060
子进程:51128
主进程:51060
子进程:42856
主进程:51060
"""

# 线程pid
from threading import Thread
import os

def task():
    print(os.getpid())

if __name__ == '__main__':
    t = Thread(target=task)
    t1 = Thread(target=task)
    t.start()
    t1.start()
    print(f"===主线程:{os.getpid()}")
"""
51768
51768
===主线程:51768
"""
# 多线程在同一个进程内, 所以pid都一样

 

同一进程内线程是共享数据的

​ 线程与线程之间可以共享数据, 进程与进程之间需要借助队列等方法实现通信

​ 同一进程内的资源数据对于这个进程内的多个线程来说是共享的

线程的应用

​ 并发: 一个CPU看起来像是同时执行多个任务

​ 单个进程开启三个线程, 并发的执行任务

​ 开启三个进程并发的执行任务

​ 文本编辑器:

​ 1.输入文字

​ 2.在屏幕上显示

​ 3.保存在磁盘中

​ 开启多线程就非常简单了:

​ 数据共享, 开销小, 速度快

from threading import Thread


x = 3

def task():
    global x
    x = 100


if __name__ == '__main__':
    t1 = Thread(target=task)
    t1.start()
    t1.join()
    print(f"===主线程{x}")
    
"""
===主线程100
"""

 

线程的其他方法

from threading import Thread, current_thread, enumerate, activeCount
import os
import time

x = 3
def task():
    time.sleep(1)
    print(666)


if __name__ == '__main__':
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t1.start()
    t2.start()
    print(t1.isAlive())    # 判断子线程是否存活
    print(t1.getName())       # 获取子线程名字
    t1.setName("子进程-1")  # 修改子线程名字
    print(t1.name)          # 获取子线程的名字 ***
    print(current_thread())  # 获取当前线程的内容
    print(enumerate())      # 获取当前进程下的所有线程,以列表形式展示
    print(activeCount())   # 获取当前进程下的所有存活线程的数量
    print(os.getpid())  
    
"""
True
Thread-1
子进程-1
<_MainThread(MainThread, started 60668)>
[<_MainThread(MainThread, started 60668)>, <Thread(子进程-1, started 55172)>, <Thread(Thread-2, started 62068)>]
3
61048
666
666
"""

 

join 与 守护线程

守护线程, 等待非守护子线程以及主线程结束之后, 结束

from threading import Thread
import time

def say_hi(name):
    print("你滚")
    time.sleep(2)
    print(f"{name} say hello")

if __name__ == '__main__':
    t = Thread(target=say_hi, args=("egon",))
    # t.setDaemon(True)      # 必须在t.start()前设置, 两种方法都可以
    t.daemon = True
    t.start()
    print("主线程")        # 注意 线程的开启速度比进程要快的多
    
"""
你滚
主线程
"""

 

from threading import Thread
import time

def foo():
    print(123)  
    time.sleep(1)
    print("end123")  

def bar():
    print(456)  
    time.sleep(3)
    print("end456")  


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")  

"""
123
456
main-------
end123
end456
"""

 

互斥锁

from threading import Thread
import time
import random
x = 100

def task():
    global x
    temp = x
    time.sleep(random.randint(1, 3))
    temp = temp - 1
    x = temp


if __name__ == '__main__':
    l1 = []
    for i in range(100):
        t = Thread(target=task)
        l1.append(t)
        t.start()

    for i in l1:
        i.join()
    print(f'主线程{x}')
"""
主线程99
"""
# 多个任务共抢一个数据, 为了保证数据的安全性, 要让其串行


from threading import Thread
from threading import Lock
import time
x = 100

def task(lock):

    lock.acquire()
    global x
    temp = x
    temp = temp - 1
    x = temp
    lock.release()


if __name__ == '__main__':
    mutex = Lock()
    l1 = []
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        l1.append(t)
        t.start()

    time.sleep(3)
    print(f'主线程{x}')
"""
主线程0
"""

 



死锁现象与递归锁

死锁现象

from threading import Thread, Lock
import time

Lock_A = Lock()
Lock_B = Lock()


class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        Lock_A.acquire()
        print(f"{self.name}拿到了A锁")

        Lock_B.acquire()
        print(f"{self.name}拿到了B锁")

        Lock_B.release()
        print(f"{self.name}释放了B锁")

        Lock_A.release()
        print(f"{self.name}释放了A锁")

    def f2(self):
        Lock_B.acquire()
        print(f"{self.name}拿到了B锁")

        time.sleep(0.1)
        Lock_A.acquire()
        print(f"{self.name}拿到了A锁")

        Lock_A.release()
        print(f"{self.name}释放了A锁")

        Lock_B.release()
        print(f"{self.name}释放了B锁")


if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()
        
"""
Thread-1拿到了A锁
Thread-1拿到了B锁
Thread-1释放了B锁
Thread-1释放了A锁
Thread-1拿到了B锁
Thread-2拿到了A锁
...
"""
# 阻塞了, 因为t1运行到f2, 拿到了B锁,要拿A锁. 此时t2开启, 运行f1拿到了A锁要拿B锁, 两者冲突了, 所以阻塞了, 形成了死锁现象

 

递归锁

​ 递归锁有一个计数的功能, 原数字为0, 上一次锁, 计数+1, 释放一次锁, 计数-1

​ 只要递归锁上面的数字不为零, 其他线程就不能抢锁

from threading import Thread, RLock
import time

Lock_A = Lock_B = RLock()
# 切记 固定格式 不要改动

class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        Lock_A.acquire()
        print(f"{self.name}拿到了A锁")

        Lock_B.acquire()
        print(f"{self.name}拿到了B锁")

        Lock_B.release()
        print(f"{self.name}释放了B锁")

        Lock_A.release()
        print(f"{self.name}释放了A锁")

    def f2(self):
        Lock_B.acquire()
        print(f"{self.name}拿到了B锁")

        time.sleep(0.1)
        Lock_A.acquire()
        print(f"{self.name}拿到了A锁")

        Lock_A.release()
        print(f"{self.name}释放了A锁")

        Lock_B.release()
        print(f"{self.name}释放了B锁")


if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()
"""
Thread-1拿到了A锁
Thread-1拿到了B锁
Thread-1释放了B锁
Thread-1释放了A锁
Thread-1拿到了B锁
Thread-1拿到了A锁
Thread-1释放了A锁
Thread-1释放了B锁
Thread-2拿到了A锁
Thread-2拿到了B锁
Thread-2释放了B锁
Thread-2释放了A锁
Thread-3拿到了A锁
Thread-3拿到了B锁
Thread-3释放了B锁
Thread-3释放了A锁
Thread-2拿到了B锁
Thread-2拿到了A锁
Thread-2释放了A锁
Thread-2释放了B锁
Thread-3拿到了B锁
Thread-3拿到了A锁
Thread-3释放了A锁
Thread-3释放了B锁
"""

 

信号量

​ 也是一种锁, 控制并发数量

from threading import Thread, Semaphore, current_thread
import time
import random

sem = Semaphore(5)


def task():
    sem.acquire()

    print(f"{current_thread().name} 厕所ing")
    time.sleep(random.randint(1, 3))
    sem.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task)
        t.start()
"""
Thread-1 厕所ing
Thread-2 厕所ing
Thread-3 厕所ing
Thread-4 厕所ing
Thread-5 厕所ing
Thread-6 厕所ing
Thread-7 厕所ing
Thread-8 厕所ing
Thread-9 厕所ing
Thread-10 厕所ing
Thread-11 厕所ing
Thread-12 厕所ing
Thread-13 厕所ing
Thread-14 厕所ing
Thread-15 厕所ing
Thread-16 厕所ing
Thread-17 厕所ing
Thread-18 厕所ing
Thread-19 厕所ing
Thread-20 厕所ing
"""
# Semaphore 管理一个内置的计数器
# 每当调用acquire()时内置计数器-1
# 调用release() 时内置计数器+1
# 计数器不能小于0; 当计数器为0时, acquire()将阻塞线程直到其他线程调用release()

 

GIL全局解释器锁

​ 理论上来说: 单个进程的多线程可以利用多核

​ 但是, 开发 Cpython 解释器的程序员, 给进入解释器的线程加了锁

 

1. GIL概念: GIL,全局解释器锁(global interpreter lock),它不是python语言的特性,而是python默认的解析器cpython的特性cpython要求每个线程必须先获取GIL锁,才能执行线程中的代码 目的:解决多线程同时竞争解析器程序的全局变量而出现的线程安全问题 不足:在多线程中不能充分利用多核cpu 原因是一个进程只存在一把gil锁,当在执行多个线程时,内部会争抢gil锁,这会造成当某一个线程没有抢到锁的时候会让cpu等待,进而不能合理利用多核cpu资源     2. 单线程、多线程、多进程执行分析 - 单线程: 双核cpu使用率50% - 多线程:双核cpu使用率50% - 多进程:双核cpu使用率100%     3.如何解决GIL问题: - 换语言, 在处理多线程代码时,用其他语言代码,比如用 c或者java - 换解析器,比如换jpython - 业界常用的方案: 多进程+多协程方法   4. 面试题: 描述Python GIL的概念, 以及它对python多线程的影响?一个单线程抓取网页的程序,与一个多线程抓取网页的程序哪个性能更高,并解释原因 1. GIL,全局解释器锁(global interpreter lock),它是cpython解析器的特性,不是python的特性 ,它要求线程在执行前,需要获取GIL锁, 2. 由于GIL的存在,会影响多线程不能利用多核CPU资源(原因是一个进程只存在一把gil锁,当在执行多个线程时,内部会争抢gil锁,这会造成当某一个线程没有抢到锁的时候会让cpu等待,进而不能合理利用多核cpu资源),通过多进程方式可利用多个CPU资源 3. 线程释放GIL锁的情况: 1.在IO操作等可能会引起阻塞的system call之前,可以暂时释放GIL,但在执行完毕后,必须重新获取GIL 2.Python 3x使用计时器(执行时间达到阈值后,当前线程释放GIL) 4. 多线程爬取比单线程性能有提升,因为遇到IO阻塞会自动释放GIL锁,这样在线程阻塞情况下,可以执行其他线程中的代码 多线程爬取比单线程性能有提升,因为遇到IO阻塞会自动释放GIL锁

现在多核时代, 去掉GIL锁可以么

​ Jpython 没有GIL锁

​ PYPY 也没有GIL锁

​ 因为 Cpython 解释器所有的业务逻辑都是围绕着单个线程实现的, 去掉这个GIL锁, 几乎不可能

单个进程的多线程可以并发么

​ 可以, 单个进程的多线程可以并发, 但是不能利用多核,不能并行

​ 多个进程可以并行, 并发

GIL与lock锁的区别

相同点:

​ 都是同种锁

不同点:

​ GIL锁全局解释器锁, 保护解释器内部的资源数据的安全.

​ GIL锁 上锁, 释放无需手动操作

​ 自己代码中定义的互斥锁保护进程中的资源数据的安全

​ 自己定义的互斥锁必须自己手动上锁, 释放锁

 

验证IO密集型与计算密集型的效率

IO密集型

单个进程的多线程并发 vs 多个进程的并发并行

# 单进程的多线程并发
from threading import Thread
import time
import random


def task():
    count = 0
    time.sleep(random.randint(1, 3))
    count +=1


if __name__ == '__main__':
    start_time = time.time()
    lst = []
    for i in range(50):
        p = Thread(target=task)
        lst.append(p)
        p.start()
    for p in lst:
        p.join()

    print(f'执行效率:{time.time() - start_time}')
"""
执行效率:3.0078725814819336
"""

 

# 多进程的并发并行
from multiprocessing import Process
import time
import random


def task():
    count = 0
    time.sleep(random.randint(1, 3))
    count +=1


if __name__ == '__main__':
    start_time = time.time()
    lst = []
    for i in range(50):
        p = Process(target=task)
        lst.append(p)
        p.start()
    for p in lst:
        p.join()

    print(f'执行效率:{time.time() - start_time}')
"""
执行效率:13.165884733200073
"""

 

​ 对于IO密集型, 单进程多线程并发效率高

计算密集型

单个进程的多线程并发 vs 多个进程的并发并行

# 单进程所线程并发
from threading import Thread
import time


def task():
    count = 0
    for i in range(10000000):
        count += 1


if __name__ == '__main__':
    start_time = time.time()
    lst = []
    for i in range(4):
        p = Thread(target=task)
        lst.append(p)
        p.start()

    for p in lst:
        p.join()

    print(f'执行效率:{time.time() - start_time}')
"""
执行效率:2.066039800643921
"""

 

# 多进程的并发并行
from multiprocessing import Process
import time


def task():
    count = 0
    for i in range(10000000):
        count += 1


if __name__ == '__main__':
    start_time = time.time()
    lst = []
    for i in range(4):
        p = Process(target=task)
        lst.append(p)
        p.start()

    for p in lst:
        p.join()

    print(f'执行效率:{time.time() - start_time}')

"""
执行效率:1.614973545074463
"""

 

​ 对于计算密集型, 多进程的并发并行效率高

多线程实现socket通信

# server服务端
import socket
from threading import Thread


def commuincata(conn, addr):
    while 1:
        try:
            from_client_data = conn.recv(1024).decode("utf-8")
            print(f"来自客户端{addr[1]}的信息{from_client_data}")
            to_client_data = input(">>>").strip().encode("utf-8")
            conn.send(to_client_data)
        except Exception:
            break
    conn.close()


def accept():
    server = socket.socket()
    server.bind(("127.0.0.1", 8848))
    server.listen(5)
    while 1:
        conn, addr = server.accept()
        t = Thread(target=commuincata, args=(conn, addr))
        t.start()


if __name__ == '__main__':
    accept()

 

# client客户端
import socket

client = socket.socket()
client.connect(("127.0.0.1", 8848))
while 1:
    try:
        to_server_data = input(">>>").strip().encode("utf-8")
        client.send(to_server_data)
        from_server_data = client.recv(1024).decode("utf-8")
        print(from_server_data)
    except Exception:
        break

client.close()

 

​ 你的计算机允许范围内, 开启的线程进程数量越多越好

进程池, 线程池

​ 一个容器, 这个容器限制住你开启进程/线程的数量, 比如4个, 第一次肯定只能并发的处理4个任务, 只要有任务完成, 进程/线程马上就会接下一个任务

​ 以时间换空间

os.cpu_count()   # CPU数量

进程池

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random


def task():
    print(f'{os.getpid()} 接客')
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    # 开启进程池
    p = ProcessPoolExecutor()
    # 默认不写, 进程池里面的进程数默认为CPU个数
    for i in range(20):
        p.submit(task)

"""
106368 接客
104388 接客
106912 接客
107368 接客
107072 接客
105324 接客
95336 接客
106572 接客
104388 接客
95336 接客
106912 接客
107368 接客
105324 接客
107072 接客
106368 接客
106912 接客
107072 接客
95336 接客
106572 接客
104388 接客
"""

 

线程池

from concurrent.futures import ThreadPoolExecutor
import os
import time
import random


def task():
    print(f'{os.getpid()} 接客')
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    # 开启线程池
    p = ThreadPoolExecutor()
    # 默认不写, 进程池里面的进程数默认为 CPU个数*5
    for i in range(20):
        p.submit(task)

"""
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
103480 接客
"""
# 因为是同一进程下的多线程, 所以pid相同

 



阻塞 非阻塞 同步 异步

阻塞

​ 调用结果返回之前, 当前线程会被挂起(如遇到IO操作). 函数只有在得到结果之后才会将阻塞的线程激活.

非阻塞

​ 和阻塞的概念相对应, 指在不能立即得到结果之前也会立刻返回, 同时该函数不会阻塞当前线程

同步

​ 在一个功能调用时, 在没有得到结果之前, 该调用就不会返回

异步

​ 和同步相对, 当一个异步功能调用发出后, 调用者不能立刻得到结果.

总结

​ 同步与异步针对的是函数/任务的调用方式: 同步就是当一个进程发起一个函数(任务)调用的时候, 一直等到函数(任务)完成, 而进程继续处于激活状态. 而异步情况下是当一个进程发起一个函数(任务)调用的时候, 不会等函数返回, 而是继续往下执行, 函数返回的时候通过状态, 通知, 事件等方式通知进程任务完成

​ 阻塞与非阻塞针对的是进程或线程: 阻塞是当请求不能满足的时候就将进程挂起, 而非阻塞则不会阻塞当前进程

同步调用 异步调用

 

 

同步调用

​ apply 一个累计1亿次的任务, 该调用会一直等待, 知道任务返回结果为止, 但并未阻塞住(即便是被抢走CPU的执行权限, 那也是处于就绪状态)

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import os
import random

def task(i):
    print(f"{os.getpid()}开始任务")
    time.sleep(random.randint(1,3))
    print(f"{os.getpid()}任务结束")
    return i

if __name__ == '__main__':
    pool = ProcessPoolExecutor()
    for i in range(10):
        obj = pool.submit(task, i)
        print(f"任务结果:{obj.result()}")
        # obj 是一个动态对象, 返回的当前的对象的状态, 有可能运行中, 可能就绪阻塞, 还可能是结束了
        # obj.result() 必须等到这个任务完成后, 返回了结果之后, 再执行下一个任务
    pool.shutdown()
    # shutdown 让主进程等待进程池中所有的子进程都结束任务之后再执行, 类似join
    # 在上一个进程池没有完成所有的任务之前, 不允许添加新的任务
    # 一个任务是通过一个函数实现的, 任务完成了他的返回值就是函数的返回值
    print("===主")

 

异步调用

​ 发起异步调用后, 并不会等待任务结束才返回, 相反, 会立即获取一个临时结果(并不是最终的结果, 可能是封装好的一个对象)

from concurrent.futures import ProcessPoolExecutor
import time
import os
import random

def task(i):
    print(f"{os.getpid()}开始任务")
    time.sleep(random.randint(1,3))
    print(f"{os.getpid()}任务结束")
    return i

if __name__ == '__main__':
    pool = ProcessPoolExecutor()
    for i in range(10):
        obj = pool.submit(task, i)
    pool.shutdown()
    print("===主")

 

异步调用如何取结果

统一回收结果

​ 不能马上收到任何一个已经完成的任务的返回值, 只能等到所有的任务全部结束统一回收

from concurrent.futures import ProcessPoolExecutor
import time
import os
import random

def task(i):
    print(f"{os.getpid()}开始任务")
    time.sleep(random.randint(1,3))
    print(f"{os.getpid()}任务结束")
    return i

if __name__ == '__main__':
    lst = []
    pool = ProcessPoolExecutor()
    for i in range(10):
        obj = pool.submit(task, i)
        lst.append(obj)
    pool.shutdown()
    print(lst)
    for i in lst:
        print(i.result())
    print("===主")

 

完成任务后立刻返回结果

​ 异步调用 + 回调函数

异步调用 + 回调函数

浏览器

​ 工作原理:

​ 向服务端发送一个请求, 服务端验证你的请求, 如果正确, 给你的浏览器返回一个文件, 浏览器接收到文件, 将文件里面的代码渲染成你看到的漂亮美丽的样子.

爬虫

​ 利用代码模拟一个浏览器, 进行浏览器的工作流程得到一堆源代码.

​ 对源代码进行数据清洗得到我想要的数据

import requests
ret = requests.get("http://www.baidu.com")
if ret.status_code == 200:
    print(ret.text)

 

回调函数

版本一

import requests

def task(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text


def parse(content):
    return len(content)


if __name__ == '__main__':
        url_list = ['http://www.baidu.com',
                'http://www.JD.com',
                'http://www.JD.com',
                'http://www.JD.com',
                'http://www.taobao.com',
                'https://www.cnblogs.com/jin-xin/articles/7459977.html',
                'https://www.luffycity.com/',
                'https://www.cnblogs.com/jin-xin/articles/9811379.html',
                'https://www.cnblogs.com/jin-xin/articles/11245654.html',
                'https://www.sina.com.cn/']
    pool = ThreadPoolExecutor(4)
    obj_list = []
    for url in url_list:
        obj = pool.submit(task, url)
        obj_list.append(obj)

    pool.shutdown()
    for res in obj_list:
        print(parse(res.result()))
    print("==主")

 

​ 线程池设置4 个线程, 异步发起10 个任务, 每个任务是通过网页获取源码, 并发执行, 最后统一用列表回收10 个任务, 串行着分析源码

​ 缺点:

​ 1. 异步发出10个任务, 并发的执行, 但是统一的接收所有的任务的返回值(效率低, 不能实时的获取结果)

​ 2. 分析结果流程是串行, 影响效率

​ 针对版本一的缺点 2 , 改进, 让串行编程并发或者并行

版本二

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
import requests

def task(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text


def parse(content):
    return len(content)


if __name__ == '__main__':
    url_list = ['http://www.baidu.com',
                'http://www.JD.com',
                'http://www.JD.com',
                'http://www.JD.com',
                'http://www.taobao.com',
                'https://www.cnblogs.com/jin-xin/articles/7459977.html',
                'https://www.luffycity.com/',
                'https://www.cnblogs.com/jin-xin/articles/9811379.html',
                'https://www.cnblogs.com/jin-xin/articles/11245654.html',
                'https://www.sina.com.cn/']
    pool = ThreadPoolExecutor(4)
    obj_list = []
    for url in url_list:
        obj = pool.submit(task, url)
        obj_list.append(obj)

    pool.shutdown()
    for res in obj_list:
        print(parse(res.result()))
    print("==主")

 

​ 线程池设置4 个线程, 异步发起10个任务, 每个任务是通过网页获取源码 + 数据分析, 并发执行, 最后将所有的结果展示出来.

​ 耦合性增强了

​ 并发执行任务, 次任务最好是IO阻塞, 才能发挥最大的效果

版本三

​ 基于异步调用回收所有任务的结果, 做到实时回收, 并发执行任务每个任务只是处理 IO 阻塞的, 不能增加新的功能

from concurrent.futures import ThreadPoolExecutor
import requests

def task(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text

def parse(obj):
    print(len(obj.result()))

if __name__ == '__main__':
    url_lst = ["http://www.baidu.com",
               "http://www.JD.com",
               "http://www.JD.com",
               "http://www.JD.com",
               "http://www.taobao.com",
               "https://www.cnblogs.com/jin-xin/articles/7459977.html",
               "https://www.luffycity.com/",
               "https://www.cnblogs.com/jin-xin/articles/9811379.html",
               "https://www.cnblogs.com/jin-xin/articles/11245654.html",
               "https://www.sina.com.cn/"]
    pool = ThreadPoolExecutor(4)

    for url in url_lst:
        obj = pool.submit(task, url)
        obj.add_done_callback(parse)

 

​ 当线程池设置4个线程, 异步发起10个任务, 每个任务是通过网页获取源码, 并发执行, 当一个任务完成之后, 将parse这个分析代码的任务交由剩余的空闲的线程去执行, 你这个线程继续去处理其他任务.

进程池 + 回调函数

​ 回调函数由主进程去执行

线程池 + 回调函数

​ 回到函数由空闲的线程去执行

小结:

​ 异步站在发布任务的角度

​ 回调站在接收结果的角度, 按顺序接收每个任务的结果, 进行下一步处理

​ 异步处理的IO类型

​ 回调处理非IO

 

线程queue

队列

import queue
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
"""
1
2
3
"""

 

t = queue.LifoQueue(4)
t.put(1)
t.put(2)
t.put(3)
t.put(4)
print(t.get())
print(t.get())
print(t.get())
print(t.get())
"""
4
3
2
1
"""

 

优先级队列

p = queue.PriorityQueue(4)
p.put((5,"oifjanior"))
p.put((-2,"hshts"))
p.put((0,"shtr"))
p.put((3,"hstrhs"))
print(p.get())
print(p.get())
print(p.get())
print(p.get())
"""
(-2,"hshts")
(0,"shtr")
(3,"hstrhs")
(5,"oifjanior")

 

事件event

​ 如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作

版本一

from threading import current_thread, Thread
import time

flag = False
def check():
    print(f"{current_thread().name}监测服务器是否开启")
    time.sleep(3)
    global flag
    flag = True
    print("服务器已经开启")

def connect():
    while 1:
        print(f"{current_thread().name}等待连接...")
        time.sleep(0.5)
        if flag:
            print(f"{current_thread().name}连接成功...")
            break
t1 = Thread(target=check)
t2 = Thread(target=connect)
t1.start()
t2.start()
"""
Thread-1监测服务器是否开启
Thread-2等待连接...
Thread-2等待连接...
Thread-2等待连接...
Thread-2等待连接...
Thread-2等待连接...
Thread-2等待连接...
服务器已经开启
Thread-2连接成功...
"""

 

版本二

from threading import Thread, current_thread, Event
import time

event = Event()
def check():
    print(f"{current_thread().name} 监测服务器是否开启...")
    time.sleep(3)
    print(event.is_set()) 
    # 判断事件是否开启
    event.set()
    print(event.is_set())
    print("服务器已经开启...")

def connect():
    print(f"{current_thread().name} 等待连接...")
    event.wait() 
    # 阻塞, 直到 event.set() 方法之后
    # event.wait(1) 只阻塞1秒, 1秒之后如果还没有进行set 直接进行下一步操作
    print(f"{current_thread().name}连接成功...")

t1 = Thread(target=check)
t2 = Thread(target=connect)
t1.start()
t2.start()

 

​ 开启两个线程, 一个线程运行到中间的某个阶段, 触发另一个线程执行. 两个线程增加了耦合性

练习

​ 一个线程监测服务器是否开始, 另一个线程判断如果开始了, 则显示连接成功, 此线程只尝试连接3次, 1s一次, 如果超过3次, 还没有连接成功, 则显示连接失败.

from threading import current_thread,Thread,Event
import time

event = Event()
def check():
    print(f"{current_thread().name}监测服务器是否开启...")
    time.sleep(4)
    event.set()
    print("服务器已开启")

def connect():
    count = 1
    while not event.is_set():
        if count == 4:
            print("连接次数过多, 已断开")
            break
        event.wait(1)
        print(f"{current_thread().name}尝试连接{count}次")
        count += 1
    else:
        print(f"{current_thread().name}连接成功..")

t1 = Thread(target=check)
t2 = Thread(target=connect)
t1.start()
t2.start()

 

协程

​ 一个线程并发的处理任务

​ 串行:

​ 一个线程执行一个任务, 执行完毕之后, 执行下一个任务.

​ 并行:

​ 多个CPU执行多个任务, 4个CPU执行4个任务

​ 并发:

​ 一个CPU执行多个任务, 看起来像是同时运行

​ 并发的本质:

​ 切换 + 保持状态

多线程并发图

​ 3个线程处理10个任务, 如果线程1处理的这个任务, 遇到阻塞, CPU被操作系统切换到另一个线程

 

单线程并发图

​ 一个线程处理三个任务

 

并发方式:

​ 单个CPU, 并发执行10个任务

​ 1.开启多进程并发执行, 操作系统切换 + 保持状态

​ 2.开启多线程并发执行, 操作系统切换 + 保持状态

​ 3.开启协程并发的执行, 自己的程序把控着CPU, 在3个任务之间来回切换 + 保持状态

详解第三种方式

​ 协程切换速度非常快, 蒙蔽操作系统的眼睛, 让操作系统认为CPU一直在运行这一个线程(协程)

最好的方式

​ 协程, 因为开销小, 运行速度快, 协程会长期霸占CPU只执行我程序里面的所有任务

​ 利用协程处理IO密集型效率高, 计算密集型, 还是使用串行

协程

​ 单个线程并发的处理多个任务, 程序控制协程的切换 + 保持状态

切换

def func1():
    print("in func1")

def func2():
    print("in func2")
    func1()
    print("end")
func2()

 

切换 + 保留状态

def gen():
    while 1:
        yield 1

def func():
    obj = gen()
    for i in range(10):
        next(obj)
func()

 

协程

import gevent
from gevent import monkey

monkey.patch_all()
def eat(name):
    print(f"{name} eat 1")
    gevent.sleep(2)
    print(f"{name} eat 2")

def play(name):
    print(f"{name} play 1")
    gevent.sleep(1)
    print(f"{name} play 2")

g1 = gevent.spawn(eat, "egon")
g2 = gevent.spawn(play, "egon")

gevent.joinall([g1,g2])

 

协程的特点

​ 1.必须在只有一个单线程里实现并发

​ 2.修改共享数据不需加锁

​ 3.用户程序里自己保存多个控制流的上下文栈(保持状态)

​ 4.附加: 一个协程遇到IO操作自动切换到其他协程

工作中

​ 一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块

 

p = queue.PriorityQueue(4)
p.put((5,"oifjanior"))
p.put((-2,"hshts"))
p.put((0,"shtr"))
p.put((3,"hstrhs"))
print(p.get())
print(p.get())
print(p.get())
print(p.get())
"""
(-2,"hshts")
(0,"shtr")
(3,"hstrhs")
(5,"oifjanior")

标签:__,协程,name,Python,进程,线程,time,print,import
来源: https://www.cnblogs.com/liurui12138/p/14445283.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有