ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

13、网络编程之并发编程

2022-08-02 07:32:26  阅读:131  来源: 互联网

标签:__ 13 name 编程 并发 time print import 进程


13.1操作系统发展史

手工操作—穿孔卡片

1946年第一台计算机诞生

1946年第一台计算机诞生--20世纪50年代中期,计算机工作还在采用手工操作方式。此时还没有操作系统的概念。

程序员使用穿孔的纸带装入输入机, 然后启动输入机把程序和数据输入计算机内存,接着通过控制台开关启动程序针对数据运行;计算完毕,打印机输出计算结果;用户取走结果并卸下纸带(或卡片)后,才让下一个用户上机。

同一个机房里面只能有一个人在操作,资源利用率低(人和计算机)。

批处理—磁带存储

批处理系统:加载在计算机上的一个系统软件,在它的控制下,计算机能够自动地、成批地处理一个或多个用户的作业(这作业包括程序、数据和命令)。

1、联机批处理系统

主机与输入机之间增加一个存储设备——磁带,在运行于主机上的监督程序的自动控制下,计算机可自动完成 。用户的程序与数据读入磁带, 依次把磁带上的用户程序与数据读入主机内存并执行并把计算结果向输出机输出。完成了上一批操作后,监督程序又从输入机上输入另一批程序与数据,保存在磁带上,并按上述步骤重复处理。

但是,在程序与数据输入与输出时,主机的高速CPU 仍处于空闲状态

2、脱机批处理系统

需要使用高速磁带是因为CPU的运行速度是非常快的,而数据从硬盘读取到内存速度是相对来说比较慢的。

高速磁带就像今天计算机的内存,输入机就像硬盘

13.2多道技术

多道技术:单核实现并发的效果

并发:看起来像同时运行的就可以称为并发

并行:真正意义上的同时执行

1、并行肯定是并发

2、单核的计算机肯定不能实现并行,但是可以实现并发

多道技术图解

多道技术的重点知识

空间上的复用和时间上的复用

空间:多个程序公用一套计算机硬件

时间:做一件事情的同时可以做其他事情,节约时间,保存状态

切换+保存状态

"""
切换(CPU)分为两种情况
	1.当一个程序遇到IO操作的时候,操作系统会剥夺该程序的CPU执行权限。
		作用:提高了CPU的利用率 并且也不影响程序的执行效率
	
	2.当一个程序长时间占用CPU的时候,操作吸引也会剥夺该程序的CPU执行权限
		弊端:降低了程序的执行效率(原本时间+切换时间)
"""

13.3进程理论

程序与进程的区别:

"""
程序就是一堆躺在硬盘上的代码,是“死”的 比如微信程序
进程则表示程序正在执行的过程,是“活”的 比如微信进程表示微信正在运行中 进程肯定在内存中
"""

进程调度

多个程序交替运行过程中,应该怎样对程序进行调度,先运行哪一个程序后运行哪一个程序

  • 先来先服务调度算法

    """对长作业有利,对短作业无利"""
    
  • 短作业优先调度算法

    """对短作业有利,对长作业无益"""
    
  • 时间片轮转法+多级反馈队列

    """
    时间片:将固定的时间切割成n多份,每一份就表示一个时间片
    如果在划分的时间内无法完成,就会把这个任务往下放置,越往下说明该任务需要的时间越长,越往下任务的执行优先级越低
    
    当第一个队列中出现了新的任务,那么cpu会立刻停止当前执行的任务,去执行新添加进来的第一层队列中的任务
    """
    

进程三状态图

三状态:就绪 运行\执行 阻塞

所有的程序要想被执行必须先经历就绪状态,准备被运行

阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。 需要重新回到就绪状态准备被执行

异步非阻塞

同步和异步:

描述的是任务的提交方式

同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事情

程序层面上表现出来的感觉就是卡住了

异步:任务提交后,不原地等待任务的返回结果,直接去做其他事情,提交的任何结果如何?任务的返回结果会有一个异步回调机制自动处理。

阻塞和非阻塞:

描述的是程序的运行状态

阻塞:程序三状态里面的阻塞态

非阻塞:就绪态,运行态

理想状态:应该让写的程序永远处于就绪态与运行态之间切换

上述概念的组合:最高效的一种组合就是异步非阻塞

创建进程的两种方式

代码开启进程和线程的方式,代码书写基本上是一致的,

第一种方式:掌握

from multiprocessing import Process
import time


def task(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is over' % name)

if __name__  ==  '__main__':
    # 1.创建一个对象
    p = Process(target=task, args=('jason',))
    # 容器类型哪怕里面只有1个元素 建议要用逗号隔开
    # 2.开启一个进程
    p.start()  # 告诉操作系统创建一个进程  异步
    print('主')

"""
Windows操作系统 创建进程一定要在main内创建因为在Windows下创建进程类似于模块导入的方式
会从上往下依次执行

Linux直接将代码复制一份
"""

第二种方式:

from multiprocessing import Process
import time


class MyProcess(Process):
    def run(self):
        print('hello bf girl')
        time.sleep(1)
        print('get out!')


if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print('主')

创建进程就是在内存中申请一块内存空间,将需要运行的代码丢进去,一个进程对应在内存中就是一块独立的内存空间,多个进程对应在内存中就是多块独立的内存空间。进程与进程之间数据默认情况下是无法直接交互的,如果想交互可以借助第三方工具,模块。

join方法

join是让主进程等待子进程代码运行结束之后,再继续运行。不影响其他子进程的执行

from multiprocessing import Process
import time


def task(name, n):
    print('%s is running'%name)
    time.sleep(n)
    print('%s is over'%name)


if __name__ == '__main__':
    # p1 = Process(target=task, args=('j', 1))
    # p2 = Process(target=task, args=('e', 2))
    # p3 = Process(target=task, args=('t', 3))
    # start_time = time.time()
    # p1.start()
    # p2.start()
    # p3.start()  # 仅仅是告诉操作系统要创建进程,p1,p2,p3没有顺序
    # # time.sleep(50000000000000000000)
    # # p.join()  # 主进程等待子进程p运行结束之后再继续往后执行
    # p1.join()
    # p2.join()
    # p3.join()
    start_time = time.time()
    p_list = []
    for i in range(1, 4):
        p = Process(target=task, args=('子进程%s'%i, i))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print('主', time.time() - start_time)

进程间数据相互隔离

from multiprocessing import Process


money = 100


def task():
    global money  # 局部修改全局
    money = 666
    print('子',money)


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print(money)

进程对象及其他方法

"""
一台计算机上面运行着很多进程,那么计算机是如何区分并管理这些进程服务端的呢?
计算机会给每一个运行的进程分配一个PID号 
如何查看
	windows电脑 
		进入cmd输入tasklist即可查看
		tasklist |findstr PID查看具体的进程
	mac电脑 
		进入终端之后输入ps aux
		ps aux|grep PID查看具体的进程 
"""
from multiprocessing import Process, current_process
current_process().pid  # 查看当前进程的进程号

import os
os.getpid()  # 查看当前进程进程号
os.getppid()  # 查看当前进程的父进程进程号


p.terminate()  # 杀死当前进程
# 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
time.sleep(0.1)
print(p.is_alive())  # 判断当前进程是否存活
from multiprocessing import Process,current_process
import time
import os


def task():
    # print('%s is running ' % current_process().pid)
    print('%s is running ' % os.getpid())
    print('子进程的主进程号%s ' % os.getppid())
    time.sleep(3)

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    # print('主',current_process().pid)
    print('主',os.getpid())
    print('主主',os.getppid()) # 获取父进程的pid

僵尸进程与孤儿进程(了解)

# 僵尸进程
"""
死了但是没有死透
当开设了子进程之后 该进程死后不会立刻释放占用的进程号
因为要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间。。。
所有的进程都会步入僵尸进程
	父进程不死并且在无限制的创建子进程并且子进程也不结束
	回收子进程占用的pid号
		父进程等待子进程运行结束
		父进程调用join方法
"""

# 孤儿进程
"""
子进程存活,父进程意外死亡
操作系统会开设一个“儿童福利院”专门管理孤儿进程回收相关资源
"""

守护进程

守护进程与操作系统同生共死

from multiprocessing import Process
import time

def task(name):
    print('%s 正在活着' % name)
    time.sleep(3)
    print('%s 正在死亡' % name)

if __name__ == '__main__':
    p = Process(target=task,args=('egon'),)
    p.daemon = True  # 将进程P设置成为守护进程必须要写在p.start()上面
    p.start()
    print('die')

互斥锁

多个进程操作同一份数据的时候,会出现数据错乱的问题

针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全

from multiprocessing import Process, Lock
import json
import time
import random


# 查票
def search(i):
    # 文件操作读取票数
    with open('data','r',encoding='utf8') as f:
        dic = json.load(f)
    print('用户%s查询余票:%s'%(i, dic.get('ticket_num')))
    # 字典取值不要用[]的形式 推荐使用get  你写的代码打死都不能报错!!!


# 买票  1.先查 2.再买
def buy(i):
    # 先查票
    with open('data','r',encoding='utf8') as f:
        dic = json.load(f)
    # 模拟网络延迟
    time.sleep(random.randint(1,3))
    # 判断当前是否有票
    if dic.get('ticket_num') > 0:
        # 修改数据库 买票
        dic['ticket_num'] -= 1
        # 写入数据库
        with open('data','w',encoding='utf8') as f:
            json.dump(dic,f)
        print('用户%s买票成功'%i)
    else:
        print('用户%s买票失败'%i)


# 整合上面两个函数
def run(i, mutex):
    search(i)
    # 给买票环节加锁处理
    # 抢锁
    mutex.acquire()

    buy(i)
    # 释放锁
    mutex.release()


if __name__ == '__main__':
    # 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
    mutex = Lock()
    for i in range(1,11):
        p = Process(target=run, args=(i, mutex))
        p.start()
"""
扩展 行锁 表锁

注意:
	1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)
	2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可) 
"""

进程间通信

队列Queue模块

队列:先进先出

堆栈:先进后出

"""
管道:subprocess 
	stdin stdout stderr
队列:管道+锁
"""
from multiprocessing import Queue

# 创建一个队列
q = Queue(5)  # 括号内可以传数字 标示生成的队列最大可以同时存放的数据量

# 往队列中存数据
q.put(111)
q.put(222)
q.put(333)
# print(q.full())  # 判断当前队列是否满了
# print(q.empty())  # 判断当前队列是否空了
q.put(444)
q.put(555)
# print(q.full())  # 判断当前队列是否满了

# q.put(666)  # 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错

"""
存取数据 存是为了更好的取
千方百计的存、简单快捷的取
"""

# 去队列中取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
# print(q.empty())
# V6 = q.get_nowait()  # 没有数据直接报错queue.Empty
# v6 = q.get(timeout=3)  # 没有数据之后原地等待三秒之后再报错  queue.Empty
# 异常捕获
try:
    v6 = q.get(timeout=3)
    print(v6)
except Exception as e:
    print('一滴都没有了!')

# # v6 = q.get()  # 队列中如果已经没有数据的话 get方法会原地阻塞
# print(v1, v2, v3, v4, v5, v6)

q.full()
q.empty()
q.get_nowait()
在多进程的情况下是不精确

IPC机制

1.主进程跟子进程借助于队列通信

from multiprocessing import Queue, Process
def producer(q):
    q.put('happy')



if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    p.start()

2.子进程跟子进程借助于队列通信

from multiprocessing import Queue, Process

def producer(q):
    q.put('happy')


def consumer(q):
    print(q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    p1 = Process(target=consumer,args=(q,))
    p.start()
    p1.start()

生产者消费者模型

生产者:生产,制造东西

消费者:消费,处理东西

该模型还需要一个媒介,比如包子放在蒸笼里面

生产者与消费者之间不是直接交互的,而是借助于一个媒介的

生产者+消息队列+消费者

from multiprocessing import Process, Queue, JoinableQueue
import time
import random


def producer(name,food,q):
    for i in range(5):
        data = '%s生产了%s%s'%(name,food,i)
        # 模拟延迟
        time.sleep(random.randint(1,3))
        print(data)
        # 将数据放入 队列中
        q.put(data)


def consumer(name,q):
    # 消费者胃口很大 光盘行动
    while True:
        food = q.get()  # 没有数据就会卡住
        # 判断当前是否有结束的标识
        # if food is None:break
        time.sleep(random.randint(1,3))
        print('%s吃了%s'%(name,food))
        q.task_done()  # 告诉队列你已经从里面取出了一个数据并且处理完毕了


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer,args=('a','包子',q))
    p2 = Process(target=producer,args=('b','泔水',q))
    c1 = Process(target=consumer,args=('c',q))
    c2 = Process(target=consumer,args=('d',q))
    p1.start()
    p2.start()
    # 将消费者设置成守护进程
    # 只要q.join执行完毕 说明消费者已经处理完数据了  消费者就没有存在的必要了
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    # 等待生产者生产完毕之后 往队列中添加特定的结束符号
    # q.put(None)  # 肯定在所有生产者生产的数据的末尾
    # q.put(None)  # 肯定在所有生产者生产的数据的末尾
    q.join()  # 等待队列中所有的数据被取完再执行往下执行代码
    """
    JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
    没当你调用task_done的时候 计数器-1
    q.join() 当计数器为0的时候 才往后运行
    """
    # 只要q.join执行完毕 说明消费者已经处理完数据了  消费者就没有存在的必要了

13.4线程理论

1、线程是什么

进程:资源单位,起一个进程仅仅只是在内存空间中开辟一块独立的空间

线程:执行单位,真正干活的人。真正被cpu执行的其实是进程里面的线程,线程指的就是代码的执行过程,执行代码中所需要使用到的资源都找所在的进程索要

将操作系统比喻成一个大的工厂,那么进程就相当于工厂里面的车间
而线程就是车间里面的流水线

每一个进程都带有一个线程

2、为什么要有线程

开设进程:申请内存空间,拷贝代码 都耗资源

开设线程:一个进程内可以开设多个线程,在用一个进程内开设多个线程无需再次申请内存空间及拷贝代码的操作

开设线程的开销要远远要小于进程的开销

同一个进程下的多个线程数据是共享的

开启线程的两种方式

第一种方式:

from multiprocessing import Process
from threading import Thread
import time


def task(name):
    print('%s is running' % name)
    time.sleep(1)
    print('%s is over' % name)

# 开启线程不需要在main下面执行代码,直接书写就可以了
# 但是还是要习惯性的将启动命名写在main下面

t = Thread(target=task,args=('hello',))
t.start() # 代码一执行就开线程了
print('主')

第二种方式:

from threading import Thread
import time


class MyThead(Thread):
    def __init__(self, name):
        """针对双下划线开头双下滑线结尾(__init__)的方法 统一读成 双下init"""
        # 重写了别人的方法 又不知道别人的方法里有什么 就调用父类的方法
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running'%self.name)
        time.sleep(1)
        print('egon DSB')


if __name__ == '__main__':
    t = MyThead('hello')
    t.start()
    print('主')

tcp服务端实现并发

多个客户端访问一个服务端

服务端

import socket
from threading import Thread
from multiprocessing import Process
"""
服务端:
    1、要有固定的IP和PORT
    2、24小时不间断提供服务
    3、能够支持并发
"""
server = socket.socket() # 括号内不加参数默认就是tcp协议
server.bind(('127.0.0.1',8080))
server.listen(5)  # 半连接池

# 将服务的代码单独封装成一个函数
def talk(conn):
    # 通信循环
    while True:
        try:
            data = conn.recv(1024)
            # 针对mac linux 客户端断开链接后
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

# 链接循环
while True:
    conn, addr = server.accept()  # 接客
    # 叫其他人来服务客户
    # t = Thread(target=talk,args=(conn,))
    t = Process(target=talk,args=(conn,))
    t.start()

客户端

import socket


client = socket.socket()
client.connect(('127.0.0.1',8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data.decode('utf-8'))

join方法

from threading import Thread
import time


def task(name):
    print('%s is running'%name)
    time.sleep(3)
    print('%s is over'%name)


if __name__ == '__main__':
    t = Thread(target=task,args=('egon',))
    t.start()
    t.join()  # 主线程等待子线程运行结束再执行
    print('主')

线程数据共享

同一个进程下的多个线程数据是共享的

from threading import Thread
import time


money = 100


def task():
    global money
    money = 666
    print(money)


if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)

线程对象及其他方法

from threading import Thread, active_count, current_thread
import os,time


def task(n):
    # print('hello world',os.getpid())
    print('hello world',current_thread().name)
    time.sleep(n)


if __name__ == '__main__':
    t = Thread(target=task,args=(1,))
    t1 = Thread(target=task,args=(2,))
    t.start()
    t1.start()
    t.join()
    print('主',active_count())  # 统计当前正在活跃的线程数
    # print('主',os.getpid())
    # print('主',current_thread().name)  # 获取线程名字

线程互斥锁

from threading import Thread,Lock
import time


money = 100
mutex = Lock()


def task():
    global money
    mutex.acquire()
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()


if __name__ == '__main__':

    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(money)

GIL全局解释器锁

"""
关于GIL全局解释器官方解释
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
"""

"""
python解释器其实有多个版本
	Cpython
	Jpython
	Pypypython
但是普遍使用的都是CPython解释器


在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行
	同一个进程下的多个线程无法利用多核优势!!!
	疑问:python的多线程是不是一点用都没有???无法利用多核优势
	
因为cpython中的内存管理不是线程安全的
内存管理(垃圾回收机制)
	1.应用计数
		变量名绑定值与销毁值
	2.标记清楚
		空间不足时,停止运行,自动扫描程序的引用计数,引用计数为0的打上标记,最后一次性的把引用计数为0的清除掉
	3.分代回收
		有一些引用计数的存活时间长,减少垃圾回收消耗的资源
	


重点:
	1.GIL不是python的特点而是CPython解释器的特点
	2.GIL是保证解释器级别的数据的安全
	3.GIL会导致同一个进程下的多个线程的无法同时执行即无法利用多核优势(******)
	4.针对不同的数据还是需要加不同的锁处理 
	5.解释型语言的通病:同一个进程下多个线程无法利用多核优势
"""

GIL与普通互斥锁的区别

from threading import Thread,Lock
import time


mutex = Lock()
money = 100


def task():
    global money
    # with mutex:
    #     tmp = money
    #     time.sleep(0.1)
    #     money = tmp -1
    mutex.acquire()
    tmp = money
    time.sleep(0.1)  # 只要你进入IO了 GIL会自动释放
    money = tmp - 1
    mutex.release()


if __name__ == '__main__':
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(money)



"""
100个线程起起来之后  要先去抢GIL
我进入io GIL自动释放 但是我手上还有一个自己的互斥锁
其他线程虽然抢到了GIL但是抢不到互斥锁 
最终GIL还是回到你的手上 你去操作数据
"""

同一个进程下的多线程无法利用多核优势,还有用吗?

单核:四个任务(IO密集型\计算密集型)

多核:四个任务(IO密集型\计算密集型)

计算密集型:计算机一直工作

单核(不用考虑了)
多进程:额外的消耗资源
多线程:节省开销
多核
多进程:总耗时 10+
多线程:总耗时 40+

IO密集型:

多核
多进程:相对浪费资源
多线程:节省资源

计算密集型

# 计算密集型
 from multiprocessing import Process
 from threading import Thread
 import os,time


 def work():
     res = 0
     for i in range(10000000):
         res *= i

 if __name__ == '__main__':
     l = []
     print(os.cpu_count())  # 获取当前计算机CPU个数
     start_time = time.time()
     for i in range(12):
         p = Process(target=work)  # 1.4679949283599854
         t = Thread(target=work)  # 5.698534250259399
         t.start()
         # p.start()
         # l.append(p)
         l.append(t)
     for p in l:
         p.join()
     print(time.time()-start_time)

IO密集型

# IO密集型
from multiprocessing import Process
from threading import Thread
import os,time


def work():
    time.sleep(2)

if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 获取当前计算机CPU个数
    start_time = time.time()
    for i in range(4000):
        # p = Process(target=work)  # 21.149890184402466
        t = Thread(target=work)  # 3.007986068725586
        t.start()
        # p.start()
        # l.append(p)
        l.append(t)
    for p in l:
        p.join()
    print(time.time()-start_time)

总结:

多进程与多线程都有各自的优势,填充多进程下面开设多线程。

死锁(了解)

抢锁必须要释放锁,在操作锁的时候也极其容易产生死锁现象(整个程序卡死 阻塞)

from threading import Thread, Lock
import time


mutexA = Lock()
mutexB = Lock()
# 类只要加括号多次 产生的肯定是不同的对象
# 如果你想要实现多次加括号等到的是相同的对象 单例模式


class MyThead(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire() # A锁
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexB.acquire() # B锁
        print('%s 抢到B锁'% self.name)
        mutexB.release()
        mutexA.release()
        
    def func2(self):
        mutexB.acquire()
        print('%s 抢到B锁'% self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThead()
        t.start()

递归锁(了解)

可以被连续的acquire和release,但是只能被第一个抢到这把锁执行上述操作,它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一,只要计数不为0 那么其他人都无法抢到该锁

from threading import Thread, Lock
import time

# 一把锁
mutexA = mutexB = RLock()


class MyThead(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire() # A锁
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexB.acquire() # B锁
        print('%s 抢到B锁'% self.name)
        mutexB.release()
        mutexA.release()
        
    def func2(self):
        mutexB.acquire()
        print('%s 抢到B锁'% self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThead()
        t.start()

信号量(了解)

信号量在不同的阶段可能对应不同的技术点

在并发编程中信号量指的是锁!!!

"""
如果我们将互斥锁比喻成一个厕所的话
那么信号量就相当于多个厕所
"""
from threading import Thread, Semaphore
import time
import random



sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位


def task(name):
    sm.acquire()
    print('%s 我是'% name)
    time.sleep(random.randint(1, 5))
    sm.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task, args=('伞兵%s号'%i, ))
        t.start()

Event事件(了解)

一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样

from threading import Thread, Event
import time


event = Event()  # 造了一个红绿灯


def light():
    print('红灯亮着的')
    time.sleep(3)
    print('绿灯亮了')
    # 告诉等待红灯的人可以走了
    event.set()


def car(name):
    print('%s 车正在灯红灯'%name)
    event.wait()  # 等待别人给你发信号
    print('%s 车加油门飙车走了'%name)


if __name__ == '__main__':
    t = Thread(target=light)
    t.start()

    for i in range(20):
        t = Thread(target=car, args=('%s'%i, ))
        t.start()

线程q(了解)

同一个进程下多个线程数据是共享的
为什么在同一个进程下还会去使用队列呢
因为队列是
管道 + 锁
所以用队列还是为了保证数据的安全

import queue

# 我们现在使用的队列都是只能在本地测试使用

# 1 队列q  先进先出
# q = queue.Queue(3)
# q.put(1)
# q.get()
# q.get_nowait()
# q.get(timeout=3)
# q.full()
# q.empty()


# 后进先出q
# q = queue.LifoQueue(3)  # last in first out
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())  # 3

# 优先级q   可以给放入队列中的数据设置进出的优先级
q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get())  # (-5, '444')
# put括号内放一个元祖  第一个放数字表示优先级
# 需要注意的是 数字越小优先级越高!!!

13.5池

无论是开设进程也好还是开设线程也好 都需要消耗资源,只不过开设线程的消耗比开设进程的稍微小。

是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上,硬件的开发速度远远赶不上软件

我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它

池是用来保证计算机硬件安全的情况下最大限度的利用计算机
降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行

进程池与线程池

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os


# pool = ThreadPoolExecutor(5)  # 池子里面固定只有五个线程
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
# 进程池
pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存在五个线程
这个五个线程不会出现重复创建和销毁的过程
池子造出来之后 里面会固定的几个进程
这个几个进程不会出现重复创建和销毁的过程

池子的使用非常的简单
你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
"""


def task(n):
    # print(n)
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())
"""
任务的提交方式
    同步:提交任务之后原地等待任务的返回结果 期间不做任何事
    异步:提交任务之后不等待任务的返回结果 执行继续往下执行
        返回结果如何获取???
        异步提交任务的返回结果 应该通过回调机制来获取
        回调机制
            就相当于给每个异步任务绑定了一个定时炸弹
            一旦该任务有结果立刻触发爆炸
"""
if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    # print('主')
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务
        # res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
        res = pool.submit(task, i).add_done_callback(call_back)
        # print(res.result())  # result方法   同步提交
        # t_list.append(res)
    # 等待线程池中所有的任务执行完毕之后再继续往下执行
    # pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
    # for t in t_list:
    #     print('>>>:',t.result())  # 肯定是有序的
"""
程序由并发变成了串行
任务的为什么打印的是None
res.result() 拿到的就是异步提交的任务的返回结果
"""

掌握的代码

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(task, i).add_done_callback(call_back)

协程

进程:资源单位
线程:执行单位,真正干活的人
协程:这个概念完全是程序员自己想象出来的 根本不存在
单线程下实现并发
程序员自己在代码层面上检测我们所有的IO操作
一旦遇到IO了 我们在代码级别自动完成切换
这样给CPU的感觉是你这个程序一直在运行 没有IO
从而提升程序的运行效率

多道技术:切换+保存状态
CPU两种切换时的情况:
1.程序遇到IO
2.程序长时间占用

代码如何做到:切换+保存状态

切换
切换不一定是提升效率 也有可能是降低效率
IO切 提升效率
没有IO切 降低效率
保存状态
保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
yield

genvet模块(了解)

安装

pip3 install gevent
from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn

"""
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
"""


def heng():
    print('哼')
    time.sleep(2)
    print('哼')


def ha():
    print('哈')
    time.sleep(3)
    print('哈')

def heiheihei():
    print('heiheihei')
    time.sleep(5)
    print('heiheihei')


start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()
g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
g3.join()
# heng()
# ha()
# print(time.time() - start_time)  # 5.005702018737793
print(time.time() - start_time)  # 3.004199981689453   5.005439043045044

协程实现tcp服务端高并发

# 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()

    
# 客户端
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

13.6IO模型

这里研究的IO模型都是针对网络IO的
五种IO Model:
blocking IO 阻塞IO
nonblocking IO 非阻塞IO
IO multiplexing IO多路复用
signal driven IO 信号驱动IO
asynchronous IO 异步IO

signal driven IO(信号驱动IO)在实际中并不常用

等待数据准备 (Waiting for the data to be ready)
将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

常见的网络阻塞状态:
accept
recv
recvfrom
send虽然它也有io行为 但是不在我们的考虑范围,因为send的速度非常快

阻塞IO

udp的recvfrom和tcp的recv发起系统调用向操作系统要数据,1、操作系统等待对方发数据 2、对方将数据发过来了然后将数据拷贝给应用程序,然后程序往下执行。

需要等待的时间是第一阶段和第二阶段

之前写过的服务端和客户端就是阻塞IO模型

import socket


server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)


while True:
    conn, addr = server.accept()
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data)
            conn.send(data.upper())
        except ConnectionResetError as e:
            break
    conn.close()

非阻塞IO

向对方要数据的时候先询问对方,提交之后无论是否有数据都会立刻得到一个结果。等待数据就不会影响程序的运行

"""
要自己实现一个非阻塞IO模型
"""
import socket
import time


server = socket.socket()
server.bind(('127.0.0.1', 8081))
server.listen(5)
server.setblocking(False)
# 将所有的网络阻塞变为非阻塞
r_list = []
del_list = []
while True:
    try:
        conn, addr = server.accept()
        r_list.append(conn)
    except BlockingIOError:
        # time.sleep(0.1)
        # print('列表的长度:',len(r_list))
        # print('做其他事')
        for conn in r_list:
            try:
                data = conn.recv(1024)  # 没有消息 报错
                if len(data) == 0:  # 客户端断开链接
                    conn.close()  # 关闭conn
                    # 将无用的conn从r_list删除
                    del_list.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_list.append(conn)
        # 挥手无用的链接
        for conn in del_list:
            r_list.remove(conn)
        del_list.clear()

# 客户端
import socket


client = socket.socket()
client.connect(('127.0.0.1',8081))


while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data)

虽然非阻塞IO非常厉害,但是该模型会 长时间占用着CPU并且不干活 让CPU不停的空转,实际应用中也不会考虑使用非阻塞IO模型。但是任何的技术点都有它存在的意义 ,实际应用或者是思想借鉴。

IO多路复用

当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比比不上,但是IO多路复用可以一次性监管很多个对象

server = socket.socket()
conn,addr = server.accept()

监管机制是操作系统本身就有的 如果你想要用该监管机制(select)
需要你导入对应的select模块

import socket
import select


server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)
read_list = [server]


while True:
    r_list, w_list, x_list = select.select(read_list, [], [])
    """
    帮你监管
    一旦有人来了 立刻给你返回对应的监管对象
    """
    # print(res)  # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], [])
    # print(server)
    # print(r_list)
    for i in r_list:  #
        """针对不同的对象做不同的处理"""
        if i is server:
            conn, addr = i.accept()
            # 也应该添加到监管的队列中
            read_list.append(conn)
        else:
            res = i.recv(1024)
            if len(res) == 0:
                i.close()
                # 将无效的监管对象 移除
                read_list.remove(i)
                continue
            print(res)
            i.send(b'heiheiheiheihei')

 # 客户端
import socket


client = socket.socket()
client.connect(('127.0.0.1',8080))


while True:

    client.send(b'hello world')
    data = client.recv(1024)
    print(data)

监管机制其实有很多
select机制 windows linux都有

poll机制 只在linux有 poll和select都可以监管多个对象 但是poll监管的数量更多

上述select和poll机制其实都不是很完美 当监管的对象特别多的时候
可能会出现 极其大的延时响应

epoll机制 只在linux有
它给每一个监管对象都绑定一个回调机制
一旦有响应 回调机制立刻发起提醒

针对不同的操作系统还需要考虑不同检测机制 书写代码太多繁琐
有一个人能够根据你跑的平台的不同自动帮你选择对应的监管机制
selectors模块

异步IO

不会得到数据,直接去做其他事情,有一个回调机制,

相关的模块和框架
模块:asyncio模块
异步框架:sanic tronado twisted
速度快

import threading
import asyncio


@asyncio.coroutine
def hello():
    print('hello world %s'%threading.current_thread())
    yield from asyncio.sleep(1)  # 换成真正的IO操作
    print('hello world %s' % threading.current_thread())


loop = asyncio.get_event_loop()
tasks = [hello(),hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

标签:__,13,name,编程,并发,time,print,import,进程
来源: https://www.cnblogs.com/xionghuan01/p/16542437.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有