Python 36 并发编程-进程 进程间通信和数据共享

进程间通信叫做 IPC (Inter-Process Communication)
进程间通信通过multiprocessing中的Queue(队列)和Pipe(管道)模块来实现.

multiprocessing.Queue 队列

队列是先进先出的,multiprocessing模块里的Queue是一个多进程安全的队列对象,几乎就是queue.Queue的克隆.

from multiprocessing import Queue
if __name__ == '__main__':
    q = Queue(5)  # 实例化队列对象,参数表示队列的容纳元素的多少.省略则表示无大小限制.
    q.put(1)  # 向队列中增加一个元素,如果此时超过队列容量,默认会阻塞
    q.get()  # 从队列中取出一个元素,如果此时队列为空,默认会阻塞
    q.full()  # 判断队列是否已满
    q.empty()  # 判断队列是否已空

将队列对象作为参数传入各个进程,即可实现进程间通信.
这里有一篇介绍Queue的文章,官方文档在此
put和get方法还有各自的put_nowait和get_nowait方法,等于各自的方法里设置false,这样就不会阻塞
但是空的时候get_nowait会报错,而队列满的时候,put_nowait会报错并失去put的值.这样可以捕获异常然后进行后续处理,就不用一直阻塞在那里等待队列.

在一般的数据处理中,生成数据的速度一般比处理比较快,比如在爬虫的实际运行环境中,一般采取多线程进行爬取数据,但是对数据的处理比较慢,就可以将爬取的数据暂时放入一个队列中,然后慢慢处理.或者在数据彼此不相关的情况下,可以增加处理者的线程,同时处理多个数据:

import time,random
from multiprocessing import Process
from multiprocessing import Queue


def producer(name,food, q):
    for i in range(10):
        time.sleep(random.randint(1,2))
        f = '{}生产的{}号{}'.format(name,i, food)
        q.put(f)
        print('{}生产了{}号{}'.format(name,i, food))


def consumer(name,q):
    while True:
        try:
            k = q.get(timeout=5)
            if k is not None:
                print('  {}吃了{}'.format(name,k))
                time.sleep(2)
            else:
                print('{}没东西吃了'.format(name))
                break
        except Exception:
            print('  {}等了这么久也没东西吃,看来是没了'.format(name))
            break
if __name__ == '__main__':
    queue = Queue()
    p1 = Process(target=producer, args=('吉祥', '包子',queue,))
    p2 = Process(target=producer, args=('红宝石', '蛋糕',queue,))
    p3 = Process(target=producer, args=('五芳斋', '粽子',queue,))
    p1.start()
    p2.start()
    p3.start()
    c1 = Process(target=consumer,args=('Jenny',queue,))
    c2 = Process(target=consumer,args=('Cony',queue,))
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    queue.put(None)

这个模型初步建立,用到的一个关键是,要在主进程内跟踪所以数据生产者的状态,保证全部生产完毕之后,给队列一个生产完毕的信号.目前的主要问题是,消费者很难判断生产是否结束,现在的做法是向队列里放一个None,子进程拿到None后正常退出,然后使用了一个长时间的超时来判断结束并退出,有没有更好的解决办法呢?
JoinableQueue,每次从队列里获取一个数据的时候,需要向队列提交一个回执,叫做q.taskdone().在每次向JoinableQueue put数据的时候,会有个计数器去加1,提交taskdone的时候,这个计数器会-1.这个兑队列本身也可以.join()来感知这个队列中的数据全部被执行完毕,这样就可以通过这个队列对象来

import time,random
from multiprocessing import Process
from multiprocessing import JoinableQueue


def producer(name,food, q):
    for i in range(10):
        time.sleep(random.randint(1,2))
        f = '{}生产的{}号{}'.format(name,i, food)
        q.put(f)
        print('{}生产了{}号{}'.format(name,i, food))
    q.join()  # 阻塞,直到一个队列中的所有数据全部处理完毕,等于一直在等着消费者拿了包子吃完才结束


def consumer(name,q):
    while True:
        food = q.get()
        if food is None:
            print('%s获取到了一个空' %name)
            break
        print('\033[31m%s消费了%s\033[0m' %(name,food))
        time.sleep(random.randint(1,3))
        q.task_done()  # 减少一个计数,get方法并不减少计数,put方法会增加计数.这样就会和生产者进程最后同步,大家一起结束.

if __name__ == '__main__':
    queue = JoinableQueue()
    p1 = Process(target=producer, args=('吉祥', '包子',queue,))
    p2 = Process(target=producer, args=('红宝石', '蛋糕',queue,))
    p3 = Process(target=producer, args=('五芳斋', '粽子',queue,))
    p1.start()
    p2.start()
    p3.start()
    c1 = Process(target=consumer,args=('Jenny',queue,))
    c2 = Process(target=consumer,args=('Cony',queue,))
    c1.daemon = True  # 设置为守护进程,主进程代码执行完毕就结束
    c2.daemon = True  # 设置为守护进程,主进程代码执行完毕就结束
    c1.start()
    c2.start()
    p1.join()
    p2.join()

这里的流程是这样: 主进程启动各个子进程后,就通过最后两行join等待所有生产者结束,然后每个生产者,又通过队列等待消费者结束.而消费者又是守护进程,会在主进程结束的时候结束.
等到消费者吃光了所有队列里的东西,并且task_done通知队列全部活干完了,生产者就会从阻塞的地方结束,此时主进程等到了生产者结束,也执行到了最后的代码,然后消费者守护进程结束,就在工作全部处理完毕的情况下,关闭了所有进程.这样,可以增加任意多的消费者和生产者,由于队列是多线程安全的(自带锁),这些消费者和生产者互相不冲突.

# 任意多个生产者消费者的模型
import time, random
from multiprocessing import Process
from multiprocessing import JoinableQueue


def producer(name, food, q):
    for k in range(10):
        time.sleep(random.randint(1, 2))
        f = '{}生产的{}号{}'.format(name, k, food)
        q.put(f)
        print('{}生产了{}号{}'.format(name, k, food))
    q.join()  # 阻塞,直到一个队列中的所有数据全部处理完毕,等于一直在等着消费者拿了包子吃完才结束


def consumer(name, q):
    while True:
        food = q.get()
        print('\033[31m%s消费了%s\033[0m' % (name, food))
        time.sleep(random.randint(1, 3))
        q.task_done()  # 减少一个计数,get方法并不减少计数,put方法会增加计数.这样就会和生产者进程最后同步,大家一起结束.


if __name__ == '__main__':
    queue = JoinableQueue()
    p_list = []
    for i in range(10):
        p = Process(target=producer, args=(str(i), '包子', queue,))
        p_list.append(p)
        p.start()
    for j in range(5):
        c = Process(target=consumer, args=(str(j), queue,))
        c.daemon = True
        c.start()

    [i.join() for i in p_list]

multiprocessing.Pipe 管道

管道也是一种组件,可以理解为一个双向的管道,有两个端点,每个端点都有发送和接受方法,从一端send东西过去,另外一端就可以recv进来.如果recv收不到,就会阻塞.
Pipe实例化会得到两个对象,是一个全双工通信管道的两端,假如两个端点为A和B,注意,A的send需要由B的recv来接收,A的recv收到的东西,是B发送的内容.
这里有一个问题就是,如果知道信息全部传送完毕.实际上,管道的引用计数是操作系统控制的,当一个管道双向全部被关闭的时候,这个管道就会关闭,再对这个管道进行操作,就会触发EOFError异常,通过捕捉这个异常,就可以知道管道是否传输完毕.
如果在主进程和多个进程间通信,需要把管道的两端都传递给所有进程,然后只用一端发的进程,就把另外一端close掉,主进程也关闭不用的一端,就像是全双工关闭了一条线路.等通信完毕之后,各自再关闭另外一端,管道就中断,引用计数变成了0,可以捕捉异常了.这里关键是理解,要用的端点,就是插在那一段数据上的管子的一头,另外一头不用了,就关闭,只用插在自己身上的那个端点.

from multiprocessing import Process, Pipe


def senp(sendp, recvp):
    recvp.close()
    while True:
        try:
            msg = sendp.recv()
            print(msg)
        except EOFError:
            sendp.close()
            break


if __name__ == '__main__':
    sendp, recvp = Pipe()
    Process(target=senp, args=(sendp, recvp,)).start()
    sendp.close()
    for i in range(20):
        recvp.send('hello')
    recvp.close()

其实可以理解为,主进程一根管子分支了很多根插到子进程上,子进程关闭recvp,实际是表示自己不使用这一端,主进程关闭sendp表示不使用这一头,但是其他头依然有引用.最后当主进程关闭自己的recvp这一头,所有的sendp各自被子进程关闭之后,管道的一端被所有进程完全关闭,就会引发EOF错误,表面管道通信已经结束,这个时候就退出通信即可.
将管道作用于生产者消费者模型:

from multiprocessing import Process, Pipe


def producer(con, pro, name):
    con.close()
    for i in range(10):
        pro.send('{}的{}号产品'.format(name, i))
    pro.close()


def consumer(con, pro, name):
    pro.close()
    while True:
        try:
            msg = con.recv()
            print('{}消费了{}'.format(name, msg))

        except EOFError:
            con.close()
            break


if __name__ == '__main__':
    con_p, pro_p = Pipe()

    for j in range(1):
        Process(target=producer, args=(con_p, pro_p, '{}'.format(j))).start()

    for k in range(3):
        Process(target=consumer, args=(con_p, pro_p, '{}'.format(k))).start()
    con_p.close()  # 在主进程里创建的管道,分别交给各个子进程,在子进程里都close之后,主进程也需要关闭,所有进程里都需要关闭所使用的管道
    pro_p.close()  # 所有进程内关闭管道

管道实际上是通过socket来通信的IPC,管道并不是进程安全的,如果想要安全,必须要给管道加锁.进程安全的是multiprocessing.Queue.
所以尽量使用Queue.
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

进程间数据共享

队列和管道,是进程间互相发送消息的渠道.如果所有的进程需要共享数据,一般需要通过锁来保证数据安全性.
展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
多进程其实是不推荐使用数据共享的,如果要共享,就要通过一些能够在进程之间传递的数据结构.比如multiprocessing.Manager模块.

from multiprocessing import Process, Manager


def minus(dic):
    dic['count'] -= 1


if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'count': 100})  # 这个字典会变成可用于数据共享的字典
    p_list = []
    for i in range(70):
        p =Process(target=minus, args=(dic,))
        p_list.append(p)
        p.start()
    [i.join() for i in p_list]
    print(dic)

结果运行可以发现,有的时候结果会出现不等于50的情况,说明manager支持的数据类型里,有一部分是多进程不安全的.只要加个锁就可以了.

from multiprocessing import Process, Manager,Lock


def minus(dic,lock):
    lock.acquire()
    dic['count'] -= 1
    lock.release()

if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({'count': 100})  # 这个字典会变成可用于数据共享的字典
    p_list = []
    for i in range(70):
        p =Process(target=minus, args=(dic,lock))
        p_list.append(p)
        p.start()
    [i.join() for i in p_list]
    print(dic)

列出几篇文章来供多进程通信参考:
理解Python并发编程一篇就够了 – 线程篇
【Multiprocessing系列】共享资源.其中的其他文章也可以看看.
Python multiprocessing 模块解析 (2) – managers 初探轮廓

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.