datetime:2025/04/15 16:28
author:nzb
进程间通信
目录
- 一. 管道
- 二. 数据共享
- 数据共享是不安全的
- 三. 进程池
- 进程池的map传参
- 进程池的同步方法
- 进程池的异步方法
- 详解apply和apply_async
- apply_async的其他方法
一. 管道
管道(不推荐使用,了解即可)是进程间通信(IPC)的第二种方式,它会导致数据不安全的情况出现.
Pipe([duplex])
: 创建管道的类,在进程之间创建一条管道, 并返回元组(conn1
,conn2
), 其中conn1
,conn2
表示管道两端的连接对象. 强调一点: 必须在产生Process
对象之前产生管道.- 参数介绍:
dumplex
: 默认管道是全双工的, 如果将duplex
设置成False
,conn1
只能用于接收,conn2
只能用于发送.
- 主要方法:
conn1.recv()
: 接收conn2.send(obj)
发送的对象. 如果没有消息可接收,recv()
方法会一直阻塞. 如果连接的另外一端已经关闭, 那么recv()
方法会抛出EOFError
.conn1.send(obj)
:通过连接发送对象。obj
是与序列化兼容的任意对象
- 其他方法:
conn1.close()
: 关闭连接. 如果conn1
被垃圾回收, 将自动调用此方法conn1.fileno()
: 返回连接使用的整数文件描述符conn1.poll([timeout])
: 如果连接上的数据可用, 返回True
.timeout
指定等待的最长时限. 如果省略此参数, 方法将立即返回结果. 如果将timeout
设置成None
, 操作将无限期地等待数据到达.conn1.recv_bytes([maxlength])
: 接收c.send_bytes()
方法发送的一条完整的字节消息.maxlength
指定要接收的最大字节数. 如果进入的消息, 超过了这个最大值, 将引发IOError
异常, 并且在连接上无法进行进一步读取. 如果连接的另外一端已经关闭, 再也不存在任何数据, 将引发EOFError
异常.conn.send_bytes(buffer[,offset[,size]])
: 通过连接发送字节数据缓冲区,buffer
是支持缓冲区接口的任意对象,offset
是缓冲区中的字节偏移量, 而size
是要发送的字节数. 数据结果以单条消息的形式发出, 然后调用c.recv_bytes()
函数进行接收conn1.recv_bytes_into(buffer[,offset])
: 接收一条完整的字节消息, 并把它保存在buffer
对象中, 该对象支持可写入的缓冲区接口(即bytearray
对象或类似的对象).offset
指定缓冲区中放置消息处的字节位移. 返回值是收到的字节数. 如果消息长度大于可用的缓冲区空间, 将引发BufferTooShort
异常.
- 参数介绍:
示例
- 例1:子进程给主进程发送消息
from multiprocessing import Process, Pipe # 引入Pipe模块
def func(conn):
conn.send("HelloWorld!") # 子进程发送了消息
conn.close() # 子进程关闭通道的一端
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 建立管道,拿到管道的两端,双工通信方式,两端都可以收发消息
p = Process(target=func, args=(child_conn,)) # 将管道的一端给子进程
p.start() # 开启子进程
print("主进程接收>>>", parent_conn.recv()) # 主进程接收了消息
p.join()
print("主进程执行结束!")
- 例2:主进程给子进程发送消息
from multiprocessing import Process, Pipe # 引入Pipe模块
def func(conn):
msg = conn.recv() # (5)子进程通过管道的另一端接收信息
print("The massage from parent_process is>>>", msg)
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # (1)创建管道,拿到管道的两端
p = Process(target=func, args=(child_conn,)) # (2)创建子进程func, 把child_conn给func
p.start() # (3)启动子进程
parent_conn.send("Hello,child_process!") # (4)主进程通过parent_conn给子进程发送信息
- 例3:主进程和子进程互相收发消息
from multiprocessing import Process, Pipe
def func(parent_conn, child_conn):
msg = child_conn.recv() # (5)子进程使用parent_conn接收主进程的消息
print("子进程使用child_conn接收>>>", msg) # (6)打印接收到的消息
child_conn.send("子进程使用child_conn给主进程发送了一条消息") # (7)子进程发送消息
print("子进程执行完毕")
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # (1)创建管道,拿到管道两端
parent_conn.send("主进程使用parent_conn给子进程发送了一条消息") # (2)主进程发消息
p = Process(target=func, args=(parent_conn, child_conn)) # (3)创建子进程,把管道两端都给子进程
p.start() # (4)开启子进程
p.join() # (8)等待子进程执行完毕
msg = parent_conn.recv() # (9)主进程使用parent_conn接收子进程的消息
print("主进程使用parent_conn接收>>>", msg) # (10)打印接收到的消息
print("主进程执行完毕!")
# 子进程使用child_conn接收>>> 主进程使用parent_conn给子进程发送了一条消息
# 子进程执行完毕
# 主进程使用parent_conn接收>>> 子进程使用child_conn给主进程发送了一条消息
# 主进程执行完毕!
应该特别注意管道端点的正确管理问题. 如果生产者或消费者中都没有使用管道的某个端点, 就应将它关闭,否则就会抛出异常. 例如: 当生产者关闭了管道的输出端时, 消费者也要同时关闭管道的输入端. 如果忘记执行这些步骤, 程序可能在消费者中的recv()
操作上挂起(就是阻塞). 管道是由操作系统进行引用计数的, 在所有进程中关闭管道的相同一端就会生成EOFError
异常. 因此, 在生产者中关闭管道不会有任何效果, 除非消费者也关闭了相同的管道端点.
from multiprocessing import Process, Pipe
def f(parent_conn,child_conn):
#parent_conn.close() #不写close将不会引发EOFError
while True:
try:
print(child_conn.recv())
except EOFError:
child_conn.close()
break
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(parent_conn,child_conn,))
p.start()
child_conn.close()
parent_conn.send('hello')
parent_conn.close()
p.join()
管道可以用于双工通信, 通常利用在客户端/服务端中使用的请求/响应模型, 或者远程过程调用, 就可以使用管道编写与进程交互的程序, 像前面将网络通信的时候, 我们使用了一个叫subprocess的模块, 里面有个参数是pipe管道, 执行系统指令, 并通过管道获取结果.
二. 数据共享
展望未来, 基于消息传递的并发编程是大势所趋. 即便是使用线程, 推荐做法也是将程序设计为大量独立的线程集合, 通过消息队列交换数据. 这样极大地减少了对使用锁定和其他同步手段的需求, 还可以扩展到分布式系统中.
进程间应该尽量避免通信, 即便需要通信, 也应该选择进程安全的工具来避免加锁带来的问题, 应该尽量避免使用本节所讲的共享数据的方式, 以后我们会尝试使用数据库来解决进程之间的数据共享问题.
进程之间数据共享的模块之一Manager模块:
进程间数据是独立的, 可以借助于队列或管道实现通信, 二者都是基于消息传递的. 虽然进程间数据独立, 但可以通过Manager实现数据共享.
- 子进程修改共享列表
from multiprocessing import Process, Manager # 引入Manager模块
def func(m_list):
m_list.pop() # 对manager列表进行改变
if __name__ == '__main__':
m = Manager() # 创建Manager对象
m_list = m.list(["王力宏", "王乃卉", "王少轩"]) # 创建manager列表
print("主进程>>>", m_list)
p = Process(target=func, args=(m_list,)) # 创建子进程
p.start()
p.join()
print("主进程>>>", m_list)
# 执行结果:
# 主进程>>> ['王力宏', '王乃卉', '王少轩']
# 主进程>>> ['王力宏', '王乃卉']
- 子进程修改共享字典
from multiprocessing import Process, Manager # 引入Manager模块
def func(m_dic):
m_dic["name"] = "王力宏" # 修改manager字典
if __name__ == '__main__':
m = Manager() # 创建Manager对象
m_dic = m.dict({"name": "王乃卉"}) # 创建manager字典
print("主进程>>>", m_dic)
p = Process(target=func, args=(m_dic,)) # 创建子进程
p.start()
p.join()
print("主进程>>>", m_dic)
# 执行结果:
# 主进程>>> {'name': '王乃卉'}
# 主进程>>> {'name': '王力宏'}
多进程共同去处理共享数据的时候, 就和我们多进程同时去操作一个文件中的数据是一样的, 不加锁就会出现错误的结果, 进程不安全的, 所以也需要加锁.
- 不加锁对共享数据进行修改,是不安全的
from multiprocessing import Process, Manager
def func(m_dic):
m_dic["count"] -= 1
if __name__ == '__main__':
m = Manager()
m_dic = m.dict({"count": 100})
p_list = []
# 开启20个进程来对共享数据进行修改
for i in range(20):
p = Process(target=func, args=(m_dic, ))
p.start()
p_list.append(p)
[p.join() for p in p_list]
print("主进程>>>", m_dic)
# 执行结果:
# 主进程>>> {'count': 80}
# 但是偶尔会出现 主进程>>> {'count': 81} 的情况, 这是因为共享数据不变, 但是当多个子进程同时访问共享数据并对其进行修改时, 由于修改的过程是要重写对共享数据进行赋值的, 在这个赋值的过程中, 可能一个子进程还没来得及赋值成功, 就有另外的一个子进程拿到原先的值, 这样一来, 就会出现多个子进程修改同一个共享数据, 于是就出现了上面代码结果偶尔会少减了一次的现象. 综上所述,共享数据是不够安全的, 而"加锁"是一个很好的解决办法.
- 加锁后的共享数据是安全的
from multiprocessing import Process, Manager, Lock
def func(m_dic, m_lock):
with m_lock:
m_dic["count"] -= 1
# 等同于:
# m_lock.acquire()
# m_dic["count"] -= 1
# m_lock.release()
if __name__ == '__main__':
m = Manager()
m_lock = Lock()
m_dic = m.dict({"count": 100})
p_list = []
# 开启20个进程来对共享数据进行修改
for i in range(20):
p = Process(target=func, args=(m_dic, m_lock))
p.start()
p_list.append(p)
[p.join() for p in p_list]
print("主进程", m_dic)
# 执行结果:
# 主进程 {'count': 80}
# 加锁后, 多次尝试运行程序, 执行结果也没有发生改变. 不难看出, 加锁后 共享数据是安全的.
三. 进程池
为什么要有进程池?
在程序实际处理问题过程中, 繁忙时会有成千上万的任务需要被执行, 空闲时却可能只有零星任务. 那么在成千上万个任务需要被执行的时候, 我们就需要去创建成千上万个进程么? 首先, 创建进程需要消耗时间, 销毁进程(空间, 变量, 文件信息等等的内容)也需要消耗时间. 第二, 即便开启了成千上万的进程, 操作系统也不能让他们同时执行, 维护一个很大的进程列表的同时, 调度的时候, 还需要进行切换并且记录每个进程的执行节点, 也就是记录上下文(各种变量等等), 这样反而会影响程序的效率. 因此我们不能无限制的根据任务数量频繁开启或者结束进程. 就看我们上面的一些代码例子, 可以发现有些程序执行后需要较长的时间才能得出结果, 这就是问题的原因, 那么我们需要如何做才能避免这种情况呢?
进程池的概念:
在这里, 介绍一个进程池的概念: 定义一个池子, 在里面放上固定数量的进程, 有需求来了, 就拿这个池中的进程来处理任务, 等到处理完毕, 进程并不关闭, 而是将进程再放回进程池中继续等待任务. 如果有很多任务需要执行, 池中的进程数量不够, 任务就要等待之前的进程执行任务完毕归来, 拿到空闲进程才能继续执行. 也就是说, 池中进程的数量是固定的, 那么同一时间最多有固定数量的进程在运行. 这样不仅降低了操作系统的调度难度, 还节省了开闭进程的时间, 也在一定程度上能够实现并发效果.
multiprocess中的Pool模块
创建进程池的类: 如果指定 numprocess
为3, 则进程池会从无到有创建三个进程, 然后自始至终使用这三个进程去执行所有任务(高级一些的进程池可以根据并发量, 设置成动态增加或减少进程池中的进程数量的操作), 这种方式不会开启其他进程, 它提高操作系统效率, 减少了空间的占用.
Pool([numprocess [, initializer [, initargs]]])
:创建进程池参数:
numprocess
: 要创建的进程数, 如果省略, 将默认使用os.cpu_count()
(os
模块中查看电脑CPU数量的一个方法)的值initializer
: 是每个工作进程启动时要执行的可调用对象, 默认为None
initargs
: 是要传给initializer
的参数组
- 主要方法
p.apply(func [, args [, kwargs]])
: 在一个池工作进程中执行func(*args,**kwargs)
, 然后返回结果.- 需要强调的是: 此操作并不会在进程池的工作过程中并发执行
func
函数. 如果要通过不同参数并发地执行func
函数, 必须从不同线程调用p.apply()
函数或者使用p.apply_async()
- 需要强调的是: 此操作并不会在进程池的工作过程中并发执行
p.apply_async(func [, args [, kwargs]])
: 在一个进程池工作过程中执行func(args,*kwargs), 然后返回结果.- 此方法的结果是
AsyncResult
类的实例,callback
是可调用对象, 接收输入参数. 当func
的结果变为可用时, 将结果传递给callback
.callback
禁止执行任何阻塞操作, 否则将接收其他异步操作中的结果.
- 此方法的结果是
p.close()
: 不允许再有其他的任务来使用进程池. 如果所有操作持续挂起, 它们将在工作进程终止前完成.P.join()
: 等待所有工作进程退出. 此方法只能在close()
或teminate()
之后调用.
- 其他方法
- 方法
apply_async()
和map_async()
的返回值是AsyncResul
的实例obj
. 实例具有以下方法:obj.get()
: 返回结果, 如果有必要则等待结果到达.timeout
是可选的. 如果在指定时间内还没有到达, 将引发异常. 如果远程操作中引发了异常, 它将在调用此方法时再次被引发.obj.ready()
: 如果调用完成, 返回True
obj.successful()
: 如果调用完成且没有引发异常, 返回True
, 如果在结果就绪之前调用此方法, 引发异常obj.wait([timeout])
: 等待结果变为可用.obj.terminate()
: 立即终止所有工作进程, 同时不执行任何清理或结束任何挂起工作. 如果p
被垃圾回收, 将自动调用此函数.
- 方法
1. 进程池的map传参
map(func, iterables)
是异步执行的, 并且自带close
和join
.
- 进程池的map传参
import time
from multiprocessing import Pool
def func(n):
time.sleep(0.5)
print(n)
if __name__ == '__main__':
pool = Pool(4) # 创建进程池对象,进程池中放置了4个进程,一般来说,这个数量是电脑的CPU数量
pool.map(func, range(100)) #参数必须是可迭代的
- 进程池与多进程的效率对比
import time
from multiprocessing import Process, Pool
def func(n):
for i in range(5):
n = n + i
if __name__ == '__main__':
pool_start_time = time.time() # 进程池开始执行时间
pool = Pool(4) #创建进程池对象,进程池中设置了4个进程
pool.map(func, range(100)) # map是异步执行的,
pool_end_time = time.time() # 进程池执行完毕时间
pool_different_time = pool_end_time - pool_start_time # 进程池执行时间差
p_start_time = time.time() # 多进程开始执行时间
p_list = []
for i in range(100):
p1 = Process(target=func, args=(i,))
p1.start()
p_list.append(p1)
[p.join() for p in p_list]
p_end_time = time.time() # 多进程执行完毕时间
p_different_time = p_end_time - p_start_time # 多进程执行时间差
print("进程池的执行时间>>>", pool_different_time)
print("多进程的执行时间>>>", p_different_time)
# 执行结果:
# 进程池的执行时间>>> 0.006899356842041016
# 多进程的执行时间>>> 0.026097774505615234
# 可以明显地看出,进程池的执行效率远远高于多进程.
2. 进程池的同步调用
import time
from multiprocessing import Pool
def func(i):
time.sleep(0.5)
return i**2
if __name__ == '__main__':
p = Pool(4)
for i in range(10):
res = p.apply(func, args=(i,))
"""p.apply() --> 同步执行的方法,它会等待子进程的返回结果,所以最后的执行结果是匀速打印出来的"""
print(res)
3. 进程池的异步调用
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' % os.getpid()) # 进程ID号
time.sleep(random.random())
return n**2
if __name__ == '__main__':
p = Pool(4) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l = []
for i in range(10):
res = p.apply_async(work, args=(i,))
"""异步运行,根据进程池中的进程数,每次最多4个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了.
返回结果之后,将结果放入列表,归还进程,之后再执行新的任务.需要注意的是,进程池中的三个进程不会同时开启或
者同时结束而是执行完一个就释放一个进程,这个进程就去接收新的任务."""
res_l.append(res)
"""异步apply_async用法:如果使用异步提交的任务,主进程需要使用join,等待进程池内任务都处理完,然后可以用get收集结果.
否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了."""
p.close() # 不是关闭进程池,而是结束进程池接收任务,确保没有新任务再提交过来.
p.join() # 感知进程池中的任务已经执行结束,只有当没有新的任务添加进来的时候,才能感知到任务结束了,所以在join之前必须加上close方法.
for res in res_l:
print(res.get()) # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get.
4. 详解apply和apply_async
- 一:使用进程池(异步调用,
apply_async
)
# coding: utf-8
from multiprocessing import Process, Pool
import time
def func(msg):
print("msg:", msg)
time.sleep(1)
return msg
if __name__ == "__main__":
pool = Pool(processes=3)
res_l = []
for i in range(10):
msg = "hello %d" % (i)
res = pool.apply_async(func, (msg, )) # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res)
# s = res.get() #如果直接用res这个结果对象调用get方法获取结果的话,这个程序就变成了同步,因为get方法直接就在这里等着你创建的进程的结果,第一个进程创建了,并且去执行了,那么get就会等着第一个进程的结果,没有结果就一直等着,那么主进程的for循环是无法继续的,所以你会发现变成了同步的效果
print("==============================>") # 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了
pool.close() # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l) # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
for i in res_l:
print(i.get()) # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
- 二:使用进程池(同步调用,
apply
)
#coding: utf-8
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(0.1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3)
res_l=[]
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
print("==============================>")
pool.close()
pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l) #看到的就是最终的结果组成的列表
for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
print(i)
5. 回调函数
需要回调函数的场景:
进程池中任何一个任务一旦处理完了, 就立即告知主进程自己已处理完毕了. 主进程则调用一个函数去处理该任务的执行结果, 该函数即回调函数, 这是进程池特有的, 普通进程没有这个机制, 但是我们也可以通过进程通信来拿到返回值, 进程池的这个回调也是进程通信的机制完成的.
我们可以把比较消耗时间(阻塞)的任务放到进程池中, 然后指定回调函数(主进程负责执行), 这样主进程在执行回调函数时就省去了I/O
的过程, 直接拿到的是任务的结果.
import os
from multiprocessing import Pool
def func1(n):
print('func1>>', os.getpid())
print('func1')
return n*n
def func2(nn):
print('func2>>', os.getpid())
print('func2')
print(nn)
# import time
# time.sleep(0.5)
if __name__ == '__main__':
print('主进程:', os.getpid())
p = Pool(5)
# args里面的10给了func1,func1的返回值作为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值
# for i in range(10, 20): # 如果是多个进程来执行任务,那么当所有子进程将结果给了回调函数之后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。我们上面func2里面注销的时间模块打开看看
# p.apply_async(func1, args=(i,), callback=func2)
p.apply_async(func1, args=(10,), callback=func2)
p.close()
p.join()
# 结果
# 主进程: 11852 #发现回调函数是在主进程中完成的,其实如果是在子进程中完成的,那我们直接将代码写在子进程的任务函数func1里面就行了,对不对,这也是为什么称为回调函数的原因。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
回调函数在写的时候注意一点, 回调函数的形参只有一个, 如果你的执行函数有多个返回值, 那么也可以被回调函数的这一个形参接收, 接收的是一个元组, 包含着你执行函数的所有返回值.
使用进程池来进行爬虫操作的时候, 最耗时间的是请求地址的网络请求延迟, 那么如果我们在将处理数据的操作加到每个子进程中, 于是所有在进程池后面排队的进程就需要等更长的时间才能获取进程池里面的进程来执行自己, 所以一般我们就将请求作成一个执行函数, 通过进程池去异步执行, 剩下的数据处理的内容放到另外一个进程或者主进程中去执行, 将网络延迟的时间也利用起来, 效率就会更高了.
进程池和信号量的区别:
- 进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己, 而信号量是一堆进程等待着去执行一段逻辑代码.
- 信号量不能控制创建多少个进程, 但是可以控制同时多少个进程能够执行.
- 进程池能控制可以创建多少个进程.