datetime:2025/04/15 16:28
author:nzb

Python多进程同步-进程间通信

目录

  • 一. 锁
  • 二. 信号量
  • 三. 事件   - 通过event来完成红绿灯模型
  • 四. 队列(重点)   - 队列实现进程间的通信
  • 五. 生产者消费者模型
    • 初始版本(程序会阻塞住)
    • 升级版本一(通过抛出异常信号的方式结束进程)
    • 升级版本二(通过发送结束信号的方式结束进程)
      • 第一种: 生产者发结束信号
      • 第二种: 主进程发结束信号
    • 升级版本三(有多个消费者和生产者的时候需要发送多次结束信号)
  • 六. JoinableQuene实现生产者消费者模型

一. 进程同步(锁)

当我们使用 muitiprocessing.Process 创建子进程时就已经实现了进程的异步了.我们可以让多个任务同时在几个进程中并发处理,它们之间的运行没有顺序,一旦开启也不受人们控制. 尽管并发编程让我们能更加充分的利用IO资源,但与此同时它也带来了新的问题: 进程之间数据不共享但却共享同一套文件系统, 所以几个进程同时访问同一个文件或同一个打印终端,是没有问题的. 可是, 共享带来的是竞争, 竞争的结果就是错乱. 如何进行控制?我们想到了"加锁处理".

  • 多进程抢占输出资源,导致打印混乱
import random, os, time
from multiprocessing import Process

def work(n):
    print("{} >>> {}号进程正在执行".format(n, os.getpid()))
    time.sleep(random.random())
    print("{} >>> {}号进程执行完毕".format(n, os.getpid()))

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=work, args=(i,))
        p.start()

# 执行结果:
# 0 >>> 9748号进程正在执行
# 1 >>> 10904号进程正在执行
# 2 >>> 8976号进程正在执行
# 3 >>> 5784号进程正在执行
# 4 >>> 12132号进程正在执行
# 4 >>> 12132号进程执行完毕
# 2 >>> 8976号进程执行完毕
# 3 >>> 5784号进程执行完毕
# 1 >>> 10904号进程执行完毕
# 0 >>> 9748号进程执行完毕
  • 加锁:由并发改成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process, Lock
def work(n, lock):
    with lock:  # 使用with语句,自动加锁和解锁
        # lock.acquire()  # 加锁,保证每次只有一个进程在执行锁内的程序.此时对于所有加锁的进程来说,都变成了串行.
        print("{} >>> {}号进程正在执行".format(n, os.getpid()))
        time.sleep(random.random())
        print("{} >>> {}号进程执行完毕".format(n, os.getpid()))
        # lock.release()  # 解锁,解锁之后其他进程才能执行自己的程序


if __name__ == '__main__':
    lock = Lock()   # 创建Lock对象
    for i in range(5):
        p = Process(target=work, args=(i, lock))
        p.start()
# 0 >>> 54526号进程正在执行
# 0 >>> 54526号进程执行完毕
# 1 >>> 54527号进程正在执行
# 1 >>> 54527号进程执行完毕
# 2 >>> 54528号进程正在执行
# 2 >>> 54528号进程执行完毕
# 3 >>> 54529号进程正在执行
# 3 >>> 54529号进程执行完毕
# 4 >>> 54530号进程正在执行
# 4 >>> 54530号进程执行完毕

上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了. 这种做法浪费了时间却保证了数据的安全.

接下来,我们以模拟抢票为例,来看看数据安全的重要性:

  • 并发运行,效率高,但是竞争同一个文件,导致数据混乱
# 注意:首先在当前文件目录下创建一个名为db的文件
# 文件db的内容为:{"count":1},只有这一行数据
# 注意字典中一定要用双引号,不然json无法识别

from multiprocessing import Process
import time
import json


with open('db', "w") as fw:
    json.dump({"count": 1}, fw)

# 查看剩余票数


def search():
    dic = json.load(open("db"))  # 打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
    print("剩余票数{}".format(dic["count"]))

# 抢票


def get(i):
    dic = json.load(open("db"))
    time.sleep(0.5)         # 模拟读取数据的网络延迟
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)       # 模拟写入数据的网络延迟
        json.dump(dic, open("db", "w"))
        print("{}号用户购票成功".format(i))


def task(i):
    search()
    get(i)


if __name__ == '__main__':
    for i in range(3):  # 模拟并发3个客户端抢票
        p = Process(target=task, args=(i,))
        p.start()

# 执行结果:
# 剩余票数1
# 剩余票数1
# 剩余票数1
# 0号用户购票成功
# 1号用户购票成功
# 2号用户购票成功

# 分析结果: 由于网络延迟等原因使得进程切换,导致每个人都抢到了这最后一张票
# 得出结论: 并发运行,效率高,但竞争写同一文件,数据写入错乱
  • 加锁:购票行为由并发变成了串行,牺牲了效率,但是保证了数据安全
import time
import json
from multiprocessing import Process, Lock   # 引入Lock模块


with open('db', "w") as fw:
    json.dump({"count": 1}, fw)

# 查看剩余票数


def search():
    dic = json.load(open("db"))  # 打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
    print("剩余票数{}".format(dic["count"]))

# 抢票


def get(i):
    dic = json.load(open("db"))
    time.sleep(0.5)         # 模拟读取数据的网络延迟
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(1)       # 模拟写入数据的网络延迟
        json.dump(dic, open("db", "w"))
        print("{}号用户购票成功".format(i))


def task(i, lock):
    search()
    lock.acquire()  # 加锁
    get(i)
    lock.release()  # 解锁


if __name__ == '__main__':
    lock = Lock()   # 创建一个锁
    for i in range(3):  # 模拟并发3个客户端抢票
        p = Process(target=task, args=(i, lock))    # "锁"也要作为参数传递给需要加锁的函数
        p.start()
# 执行结果:
# 剩余票数1
# 剩余票数1
# 剩余票数1
# 0号用户购票成功

# 分析结果: 只有一个人抢到了票
# 得出结论: 加锁保证数据安全,不出现混乱

进程锁分析总结:

加锁可以保证:多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改.这种方式虽然牺牲了速度(效率)却保证了数据安全.

虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制: 队列和管道.

队列和管道都是将数据存放于内存中

队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性.

二. 信号量(Semaphore)

信号量可以规定同时进入锁内的进程数量

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。

假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。

  • 实现:信号量同步基于内部计数器,每调用一次 acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()V()Python实现。信号量同步机制适用于访问像服务器这样的有限资源。

信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

  • 示例
import random
import time
from multiprocessing import Process, Semaphore

# 假设10个人去游戏厅玩: 提前设定好,一个房间只有4台游戏机(计数器现在为4),那么同时只能四个人进来,谁先来的谁先占一个游戏机(acquire,计数器减1),4台机器满了之后(计数器为0),第五个人就要等着,等其中一个人出来(release,计数器加1),他就可以占用那台游戏机了.


def play(i, s):
    with s:
        # s.acquire()
        print("{}号顾客来玩游戏了".format(i))
        time.sleep(random.randrange(2, 5))  # 每位顾客游戏时间不同
        # s.release()


if __name__ == '__main__':
    s = Semaphore(4)    # 设定好一次只能4个人进来
    for i in range(10):  # 创建10位顾客
        p = Process(target=play, args=(i, s))
        p.start()

三. 事件(Event)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 setwaitclear.

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 wait 方法时就会阻塞,如果“Flag”值为True,那么执行 wait 方法时便不再阻塞.

  • is_set(): 查看一个事件的状态,默认为 False
  • clear(): 将“Flag”设置为 False
  • set(): 将“Flag”设置为 True
from multiprocessing import Event

e = Event()         # 创建一个事件对象
print(e.is_set())   # 执行结果: False

e.set()             # 将is_set()的状态改为True
print(e.is_set())   # 执行结果: True

e.clear()           # 将is_set()的状态改为True
print(e.is_set())   # 执行结果: False
  • wait
from multiprocessing import Event

e = Event() # 创建一个事件对象
e.set()     # 将is_set()的状态改为True
print(e.is_set())

print("我在wait之前!")
e.wait()    # 依据事件的状态来决定是否阻塞: False-->阻塞  True-->不阻塞
print("我在wait之后!")

# 执行结果:
# True
# 我在wait之前!
# 我在wait之后!

# 从上面的结果可以看出,set()方法将is_set()的状态改为True,则此时事件的状态为True,于是程序执行到wait()方法处不会阻塞住,继续向下执行

from multiprocessing import Event

e = Event() # 创建一个事件对象
e.set()     # 将is_set()的状态改为True
print(e.is_set())
e.clear()   # 将is_set()的状态改为False
print(e.is_set())

print("我在wait之前!")
e.wait()    # 依据事件的状态来决定是否阻塞: False-->阻塞  True-->不阻塞
print("我在wait之后!")

# 执行结果:
# True
# False
# 我在wait之前!

# 从上面的结果可以看出,set()方法将is_set()的状态改为True,clear()方法又重新将is_set()的状态改为False,则此时事件的状态为False,于是程序执行到wait()方法处就阻塞住了
  • 红绿灯示例
import time
from multiprocessing import Process, Event

# 创建一个"模拟红绿灯执行状态"的函数
def traffic_lights(e):
    while 1:
        print("!!!红灯亮!!!")
        time.sleep(6)
        e.set()     # 把e改为True
        print("~~~绿灯亮~~~")
        time.sleep(3)
        e.clear()   # 把e改为False

def car(i, e):
    if not e.is_set():  # 新来的车看到的是红灯,执行这里,车在等待
        print("车{}在等待......".format(i))
        e.wait()
        print("车{}走你........".format(i))
    else:               # 此时已经是绿灯,执行这里,车可以走了
        print("车{}可以走了....".format(i))

if __name__ == '__main__':
    e = Event()
    # 创建一个红绿灯
    tra_lig = Process(target=traffic_lights,args=(e,))
    tra_lig.start()
    while 1:
        time.sleep(1)
        # 创建3辆车
        for i in range(3):
            c = Process(target=car, args=(i, e))
            c.start()

四. 队列(重点)

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。

  • 语法: Queue([maxsize]) --> 创建共享的进程队列
  • 参数: maxsize --> 是队列中允许的最大项数.如果省略此参数,则无大小限制
  • 注意: 队列的底层使用管道和锁实现

方法

  • q = Queue([maxsize])

创建共享的进程队列. maxsize是队列中允许的最大项数. 如果省略此参数, 则无大小限制. 底层队列使用管道和锁定实现. 另外, 还需要运行支持线程以便队列中的数据传输到底层管道中.

  • q.get([block[,timeout]]):返回q中的一个项目. 如果q为空, 此方法将阻塞, 直到队列中有项目可用为止. block用于控制阻塞行为, 默认为True, 如果设置为False, 将引发Queue.Empty异常(定义在Queue模块中). timeout是可选超时时间, 用在阻塞模式中, 如果在制定的时间间隔内没有项目变为可用, 将引发Queue.Empty异常.

  • q.get_nowait():同q.get(False)方法.

  • q.put(item[,block[,timeout]]):将item放入队列. 如果队列已满, 此方法将阻塞至有空间可用为止. block控制阻塞行为, 默认为True. 如果设置为False, 将引发Queue.Full异常(定义在Queue库模块中). timeout指定在阻塞模式中等待可用空间的时间长短, 超时后将引发Queue.Full异常.

  • q.qsize():返回队列中目前项目的正确数量. 此函数的结果并不可靠, 因为在返回结果和在稍后程序中使用结果之间, 队列中可能添加或删除了项目. 在某些系统上, 此方法可能引发NotImplementedError异常.

  • q.empty():如果调用此方法时q为空, 返回True. 如果其他进程或线程正在往队列中添加项目, 结果是不可靠的. 也就是说, 在返回和使用结果之间, 队列中可能已经加入新的项目.

  • q.full():如果q已满, 返回为True. 由于线程的存在, 结果也可能是不可靠的(参考q.empty()方法).

  • q.close():关闭队列, 防止队列中加入更多数据. 调用此方法时, 后台线程将继续写入那些已入队列但尚未写入的数据, 但将会在此方法完成时马上关闭. 如果q被垃圾收集, 将自动调用此方法. 关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常. 例如, 如果某个使用者正被阻塞在get()操作上, 关闭生产者中的队列不会导致get()方法返回错误.

  • q.cancel_join_thread():不会在进程退出时自动连接后台线程. 这可以防止join_thread()方法阻塞.

  • q.join_thread():连接队列的后台线程. 此方法用于在调用q.close()方法后, 等待所有队列项被消耗. 默认情况下, 此方法由不是q的原始创建者的所有进程调用. 调用q.cancel_join_thread()方法可以禁止这种行为.

from multiprocessing import Queue
import queue
q = Queue(3)  # 创建一个队列对象,队列长度为3

# 以下是方法详述: put, get, put_nowait, get_nowait, full, empty
q.put(1)                # 往队列中添加数据
q.put(2)
q.put(3)
# q.put(4)              # 如果队列已经满了, 程序就会停在这里, 等待数据被别人取走, 再将数据放入队列. 但如果队列中的数据一直不被取走, 程序就会永远停在这里.

try:
    q.put_nowait(4)     # 使用put_nowait(), 如果队列满了不会阻塞, 但是会因为队列满了而报错.
except queue.Full:                 # 因此我们可以用一个try语句来处理这个错误, 这样程序不会一直阻塞下去, 但是会丢掉这个消息.
    print("队列已经满了!")

# 所以, 我们再放入数据之前, 可以先看一下队列的状态, 如果已经满了, 就不继续put了.
print(q.full())         # 查看队列是否满了, 满了返回True, 不满返回False.

print(q.get())          # 取出数据
print(q.get())
print(q.get())
# print(q.get())        # get()同put()方法一样, 如果队列已经空了, 那么继续取就会出现阻塞现象.

try:
    q.get_nowait()     # 可以使用get_nowait()方法, 如果队列满了不会阻塞, 但是会因为没取到值而报错.
except queue.Empty:                 # 因此我们可以用一个try语句来处理这个错误, 这样程序不会一直阻塞下去.
    print("队列已经空了")

print(q.empty())        # 查看队列是否空了, 空了返回True, 不空返回False.

队列实现进程间的通信:

import time
from multiprocessing import Process,Queue

def girl(q):
    print("来自boy的信息>>>", q.get())
    print("来自班主任的凝视>>>", q.get())

def boy(q):
    q.put("中午一起吃饭吗?")

if __name__ == '__main__':
    q = Queue(5)
    boy_p = Process(target=boy, args=(q,))
    girl_p = Process(target=girl, args=(q,))
    boy_p.start()
    girl_p.start()
    time.sleep(1)   # 等待子进程执行完毕
    q.put("好好上课,别开小差!")

# 执行结果:
# 来自boy的信息>>> 中午一起吃饭吗?
# 来自班主任的凝视>>> 好好上课,别开小差!
  • 队列是进程安全的: 同一时间只能一个进程拿到队列中的一个数据, 你拿到了一个数据, 这个数据别人就拿不到了.
import os, time
import multiprocessing

# 向queue中输入数据的函数
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中输出数据的函数
def outputQ(queue):
    info = queue.get()
    print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))

# Main
if __name__ == '__main__':
    # windows下,如果开启的进程比较多的话,程序会崩溃,为了防止这个问题,使用freeze_support()方法来解决(了解即可)
    multiprocessing.freeze_support()
    record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3)

    # 输入进程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)

    # 输出进程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

五. 生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题. 该模式通过平衡"生产线程"和"消费线程"的工作能力来提高程序的整体处理数据的速度.

  • 为什么要使用生产者和消费者模式?

在线程世界里, 生产者就是生产数据的线程, 消费者就是消费数据的线程. 在多线程开发当中, 如果生产者处理速度很快, 而消费者处理速度很慢, 那么生产者就必须等待消费者处理完, 才能继续生产数据. 同样的道理, 如果消费者的处理能力大于生产者, 那么消费者就必须等待生产者. 为了解决这个问题于是引入了生产者和消费者模式.

  • 什么是生产者消费者模式?

生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题的. 生产者和消费者彼此之间不直接通讯, 而通过阻塞队列来进行通讯, 所以生产者生产完数据之后不用等待消费者处理, 直接扔给阻塞队列, 消费者不找生产者要数据, 而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者的处理能力, 并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务, 就可以开多进程等等, 而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据的.

  • 1.初始版本(程序会阻塞住)
import time
from multiprocessing import Process, Queue


# 版本1
def producer(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("已生产了{}个产品".format(i))

def consumer(q):
    while 1:    # 死循环,不停地往外取
        time.sleep(2)
        s = q.get()
        print("消费者已拿走{}个产品".format(s))

if __name__ == '__main__':
    # 通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    # 生产者进程
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    # 消费者进程
    con_p = Process(target=consumer, args=(q,))
    con_p.start()

# 从最后的执行结果中可以看出: 当消费者取出了所有产品之后,程序并没有结束,而是阻塞在消费者进程的get()处了.
  • 2.升级版本一(通过抛出异常信号的方式结束进程)
def producer(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("已生产了{}个产品".format(i))

def consumer(q):
    while 1:
        time.sleep(2)
        try:
            s = q.get(False)        # 如果队列为空,则再次get()会抛出异常
            # s = q.get_nowait()    # get_nowait()与get(False)是等同的效果
            print("消费者已拿走{}个产品".format(s))
        except:                 # 捕获异常
            break               # 结束循环

if __name__ == '__main__':
    # 通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    # 生产者进程
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    # 消费者进程
    con_p = Process(target=consumer, args=(q,))
    con_p.start()
  • 3.升级版本二(通过发送结束信号的方式结束进程)

    • 第一种: 生产者发结束信号
import time
from multiprocessing import Process, Queue

def producer(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("{}号产品已生产完毕".format(i))
    q.put(None)         # 生产者在自己的子进程的最后加入一个结束信号

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:   # 如果消费者最后拿到了结束信号(None)就会跳出循环
            break
        else:
            print("消费者已拿走{}个产品".format(s))

if __name__ == '__main__':
    # 通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    # 生产者进程
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    # 消费者进程
    con_p = Process(target=consumer, args=(q,))
    con_p.start()
  • 第二种: 主进程发结束信号
import time
from multiprocessing import Process, Queue


def producer(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("{}号产品已生产完毕".format(i))


def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print("消费者已拿走{}个产品".format(s))


if __name__ == '__main__':
    # 通过队列来模拟缓冲区,大小设置为20
    q = Queue(20)
    # 生产者进程
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    # 消费者进程
    con_p = Process(target=consumer, args=(q,))
    con_p.start()

    pro_p.join()    # 生产者进程执行完毕后才会执行主进程
    q.put(None)     # 主进程在生产者生产结束后发送结束信号None
  • 4.升级版本三(有多个消费者和生产者的时候需要发送多次结束信号)
# 升级版本三: 有多个消费者和生产者的时候需要发送多次结束信号,有几个消费者来取(有几个get())就发送几次结束信号
import time
from multiprocessing import Process,Queue

def producer1(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("生产者1号已经生产了{}个产品".format(i))

def producer2(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("生产者2号已经生产了{}个产品".format(i))

def producer3(q):
    for i in range(1, 11):
        time.sleep(1)
        q.put(i)
        print("生产者3号已经生产了{}个产品".format(i))

def consumer1(q):
    while 1:
        el = q.get()
        if el == None:  # 跳出循环的条件
            break
        print("消费者1号已经取走了{}个产品".format(el))

def consumer2(q):
    while 1:
        el = q.get()
        if el == None:  # 跳出循环的条件
            break
        print("消费者2号已经取走了{}个产品".format(el))

def consumer3(q):
    while 1:
        el = q.get()
        if el == None:  # 跳出循环的条件
            break
        print("消费者3号已经取走了{}个产品".format(el))

if __name__ == '__main__':
    q = Queue(50)   # 创建队列,通过队列来模拟缓冲器,大小为50
    producer1_process = Process(target=producer1, args=(q,))    # 创建所有生产者进程
    producer2_process = Process(target=producer2, args=(q,))
    producer3_process = Process(target=producer3, args=(q,))

    consumer1_process = Process(target=consumer1, args=(q,))    # 创建所有消费者进程
    consumer2_process = Process(target=consumer2, args=(q,))
    consumer3_process = Process(target=consumer3, args=(q,))

    producer1_process.start()   # 启动所有生产者进程
    producer2_process.start()
    producer3_process.start()

    consumer1_process.start()   # 启动所有消费者进程
    consumer2_process.start()
    consumer3_process.start()

    producer1_process.join()    # 必须保证生产者全部执行完毕主进程才能继续执行
    producer2_process.join()
    producer3_process.join()
    q.put(None)     # 有多少个消费者来取走产品就要发送几次结束信号
    q.put(None)
    q.put(None)
    print("主进程执行完毕")

六. JoinableQuene实现生产者消费者模型

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

  • 参数介绍:
    • maxsize是队列中允许最大项数,省略则无大小限制。
  • 方法介绍:JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
import time
from multiprocessing import Process, JoinableQueue
def producer(q):
    for i in range(1, 11):
        time.sleep(0.5)
        q.put(i)
        print("{}号产品已生产完毕".format(i))
    q.put(None)
    q.join()
    print("在这里等待...")


def consumer(q):
    while 1:
        time.sleep(1)
        s = q.get()
        if s == None:
            break
        else:
            print("消费者已拿走{}个产品".format(s))
            q.task_done()   # 给q对象发送一个任务结束的信号
    q.task_done()   # 给q对象发送一个任务结束的信号


if __name__ == '__main__':
    # 通过队列来模拟缓冲区,大小设置为20
    q = JoinableQueue(20)
    # 生产者进程
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    # 消费者进程
    con_p = Process(target=consumer, args=(q,))
    con_p.daemon = True     # 把消费者进程设置为守护进程,它会随主进程的结束而结束
    con_p.start()

    pro_p.join()    # 主进程要等待生产者执行结束再继续执行
    print("主进程结束")

总而言之,先把消费者进程设置为守护进程,于是消费者进程与守护进程同生共死. 于是,在主进程和消费者进程结束之前,必须等待生产者进程执行完毕,如此一来,我们 便可以看到这样一个过程: 主进程开启,生产者首先执行,消费者紧跟其后执行,最后 待生产者执行结束后,消费者和主进程同时结束.整个程序全部结束.

results matching ""

    No results matching ""