Python 多进程及进程间通信

tech2024-06-08  71

作者:billy 版权声明:著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处

什么是进程

在了解进程之前,我们需要知道多任务的概念。多任务,顾名思义,就是指操作系统能够执行多个任务。例如,使用 Windows 或 Linux 操作系统可以同时看电影、聊天、听音乐等等,此时操作系统就是在执行多任务,而每个任务就是一个进程。

进程(Process)是计算机中已运行程序的实体。进程与程序不同,程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例。

创建进程的常用方式

在 Python 中有多个模块可以创建进程,比较常用的有 os.fork() 函数、multiprocessing 模块和 Pool 进程池。由于 os.fork() 函数只适合在 UNIX/Linux/Mac 系统上运行,在 Windows 操作系统中不可用,所以这里介绍一下 multiprocessing 模块和 Pool 进程池这两个跨平台模块。

使用 multiprocessing 模块创建进程 multiprocessing 模块提供了一个 Process 类来代表一个进程对象,语法如下: Process([group [, target [, name [, args [, kwargs]]]]]) group:参数未使用,值始终为 None; target:表示当前进程启动时执行的可调用对象; name:为当前进程实例的别名; args:表示传递给 target 函数的参数元组; kwargs:表示传递给 target 函数的参数字典;

Process 类的实例常用的属性和方法有:

属性/方法说明name当前进程实例的别名,默认为 Process-N,N 为从 1 开始递增的整数pid当前进程实例的 PID 值is_alive()判断进程实例是否还在执行join([timeout])是否等待进程实例执行结束,或等待多少秒start()启动进程实例(创建子进程)run()如果没有给定 target 参数,对这个对象调用 start() 方法时,就将执行对象中的 run() 方法terminate()不管任务是否完成,立即终止

示例:

from multiprocessing import Process import time import os def child_1(interval): print("子进程 (%s) 开始执行,父进程为 (%s)" % (os.getpid(), os.getppid())) t_start = time.time() time.sleep(interval) t_end = time.time() print("子进程 (%s) 执行时间为'%0.2f'秒" % (os.getpid(), t_end - t_start)) def child_2(interval): print("子进程 (%s) 开始执行,父进程为 (%s)" % (os.getpid(), os.getppid())) t_start = time.time() time.sleep(interval) t_end = time.time() print("子进程 (%s) 执行时间为'%0.2f'秒" % (os.getpid(), t_end - t_start)) if __name__ == '__main__': print("---父进程开始执行---") print("父进程 PID: %s" % os.getpid()) p1 = Process(target=child_1, args=(1,)) p2 = Process(target=child_2, name="test", args=(2,)) p1.start() p2.start() print("p1.is_alive = %s" % p1.is_alive()) print("p2.is_alive = %s" % p2.is_alive()) print("p1.name = %s" % p1.name) print("p1.pid = %s" % p1.pid) print("p2.name = %s" % p2.name) print("p2.pid = %s" % p2.pid) print("---等待子进程---") p1.join() p2.join() print("---父进程执行结束---")

上述例子的运行结果为:

---父进程开始执行--- 父进程 PID: 12936 p1.is_alive = True p2.is_alive = True p1.name = Process-1 p1.pid = 16640 p2.name = test p2.pid = 17600 ---等待子进程--- 子进程 (16640) 开始执行,父进程为 (12936) 子进程 (17600) 开始执行,父进程为 (12936) 子进程 (16640) 执行时间为'1.00'秒 子进程 (17600) 执行时间为'2.00'秒 ---父进程执行结束--- 使用 Process 子类创建进程 对于一些简单的小任务,通常使用 Process(target=test) 方式实现多进程。但是如果要处理复杂任务的进程,通常定义一个类,使其继承 Process 类,每次实例化这个类的时候,就等同于实例化一个进程对象。

示例:

from multiprocessing import Process import time import os class SubProcess(Process): def __init__(self, interval, name=''): Process.__init__(self) # 调用父类的初始化方法 self.interval = interval if name: self.name = name # 重写 run() 方法 def run(self): print("子进程 (%s) 开始执行,父进程为 (%s)" % (os.getpid(), os.getppid())) t_start = time.time() time.sleep(self.interval) t_end = time.time() print("子进程 (%s) 执行结束,耗时%0.2f秒" % (os.getpid(), t_end - t_start)) if __name__ == '__main__': print("---父进程开始执行---") print("父进程 PID: %s" % os.getpid()) p1 = SubProcess(interval=1, name='test') p2 = SubProcess(interval=2) p1.start() # 对一个不包含 target 属性的 Process 类执行 start() 方法,就会运行这个类中的 run() 方法 p2.start() print("p1.is_alive = %s" % p1.is_alive()) print("p2.is_alive = %s" % p2.is_alive()) print("p1.name = %s" % p1.name) print("p1.pid = %s" % p1.pid) print("p2.name = %s" % p2.name) print("p2.pid = %s" % p2.pid) print("---等待子进程---") p1.join() p2.join() print("---父进程执行结束---")

上述例子的运行结果为:

---父进程开始执行--- 父进程 PID: 3000 p1.is_alive = True p2.is_alive = True p1.name = test p1.pid = 11452 p2.name = SubProcess-2 p2.pid = 884 ---等待子进程--- 子进程 (11452) 开始执行,父进程为 (3000) 子进程 (884) 开始执行,父进程为 (3000) 子进程 (11452) 执行结束,耗时1.00秒 子进程 (884) 执行结束,耗时2.01秒 ---父进程执行结束--- 使用进程池 Pool 创建进程 在上述两个实例中,我们使用 Process 类创建了两个进程。如果要创建几十个或者上百个进程,则需要实例化更多个 Process 类对象,比较麻烦。遇到此类情况,则可以使用 Pool 进程池来创建进程。

Pool 类的常用方法如下:

方法说明apply_async(func[, args[, kwds]])使用非阻塞方式调用 func 函数(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表apply(func[, args[, kwds]])使用阻塞方式调用 func 函数close()关闭 Pool,使其不再接受新的任务terminate()不管任务是否完成,立即终止join()主进程阻塞,等待子进程的退出,必须在 close 或 terminate 之后使用

示例:

from multiprocessing import Pool import time import os def task(name): print("子进程 (%s) 开始执行 task %s..." % (os.getpid(), name)) time.sleep(1) # 休眠 1 秒 if __name__ == '__main__': print("---父进程开始执行---") print("父进程 PID: %s" % os.getpid()) p = Pool(3) # 定义一个进程池,最大进程数为 3 for i in range(10): p.apply_async(task, args=(i,)) # 使用非阻塞方式调用 task() 函数 print("等待所有子进程结束") p.close() p.join() print("---父进程执行结束---")

上述例子的运行结果为:

---父进程开始执行--- 父进程 PID: 11416 等待所有子进程结束 子进程 (4640) 开始执行 task 0... 子进程 (11428) 开始执行 task 1... 子进程 (8244) 开始执行 task 2... 子进程 (4640) 开始执行 task 3... 子进程 (11428) 开始执行 task 4... 子进程 (8244) 开始执行 task 5... 子进程 (4640) 开始执行 task 6... 子进程 (8244) 开始执行 task 7... 子进程 (11428) 开始执行 task 8... 子进程 (4640) 开始执行 task 9... ---父进程执行结束---

上述例子中,最大进程数为 3,且以非阻塞方式调用 task(),代表着同一时间有3个进程在调用 task() 方法,3个进程结束之后再开启3个进程,结束之后再开启3个,最后只有1个进程在执行。

进程间通信

每个进程都有自己的地址空间、内存、数据栈以及其他记录其运行状态的辅助数据,进程之间没有共享信息,用下面一个例子来验证一下。

示例:

from multiprocessing import Process def plus(): print("---加法进程开始执行---") global g_num g_num += 50 print('g_num is %d' % g_num) print("---加法进程执行结束---") def minus(): print("---减法进程开始执行---") global g_num g_num -= 50 print('g_num is %d' % g_num) print("---减法进程执行结束---") g_num = 100 # 定义全局变量 g_num if __name__ == '__main__': print("---主进程开始执行---") print('g_num is %d' % g_num) p1 = Process(target=plus) p2 = Process(target=minus) p1.start() p2.start() p1.join() p2.join() print("--主进程执行结束---")

上述例子的运行结果为:

---主进程开始执行--- g_num is 100 ---加法进程开始执行--- g_num is 150 ---加法进程执行结束--- ---减法进程开始执行--- g_num is 50 ---减法进程执行结束--- --主进程执行结束---

Python de multiprocessing 模块包装了底层的机制,提供了 Queue(队列)、Pipes(管道)等多种方式来交换数据。这里主要介绍一下通过队列来实现进程间的通信。

队列(Queue)就是模仿现实中的排队,最前面的最先走,后面来的排在最后面,如图所示: 队列的初始化:q = Queue(num)若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接收的消息数量没有上限(直到内存的尽头)。

Queue 的常用方法如下所示:

方法说明Queue.qsize()返回当前队列包含的消息数量Queue.empty()如果队列为空,返回 True,否则返回 FalseQueue.full()如果队列满了,返回 True,否则返回 FalseQueue.get([block[, timeout]])获取队列中的一条消息,然后将其从队列中移除,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位:秒),消息队列为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止,如果设置了 timeout,则会等待 timeout 秒,若还没有读取到任何消息,则抛出 Queue.Empty 异常Queue.get_nowait()相当于 Queue.get(False)Queue.put(item, [block[, timeout]])将 item 消息写入队列,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位:秒),消息队列如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了 timeout,则会等待 timeout 秒,若还没有空间,则抛出 Queue.full 异常Queue.put_nowait(item)相当于 Queue.put(item, False)

示例:

from multiprocessing import Queue if __name__ == '__main__': q = Queue(3) # 初始化一个 Queue 对象,最多可接收 3 条 put 消息 q.put("消息1") q.put("消息2") print(q.full()) q.put("消息3") print(q.full()) # 因为消息队列已满,再 put 会报异常,第一个 try 等待 2 秒后再抛出异常,第二个 try 立刻抛出 try: q.put("消息4", True, 2) except: print("消息队列已满,现有消息数量: %s" % q.qsize()) try: q.put_nowait("消息4") except: print("消息队列已满,现有消息数量: %s" % q.qsize()) # 读取消息时,先判断消息队列是否为空,再读取 if not q.empty(): print("----从消息队列中获取消息--") for i in range(q.qsize()): print(q.get_nowait())

上述例子的运行结果为:

False True 消息队列已满,现有消息数量: 3 消息队列已满,现有消息数量: 3 ----从消息队列中获取消息-- 消息1 消息2 消息3

下面通过一个实例结合 Process 和 Queue 实现进程间通信。创建两个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。

示例:

from multiprocessing import Process, Queue import time # 向对列中写入数据 def write_task(q): if not q.full(): for i in range(5): message = "消息" + str(i) q.put(message) print("写入: %s" % message) # 从队列读取数据 def read_task(q): time.sleep(1) while not q.empty(): print("读取: %s" % q.get(True, 2)) # 等待 2 秒,如果还没有读取到任何消息,则抛出异常 if __name__ == '__main__': print("---父进程开始---") q = Queue() # 父进程创建 Queue,并传递给子进程 pw = Process(target=write_task, args=(q,)) pr = Process(target=read_task, args=(q,)) pw.start() pr.start() print("---等待子进程结束---") pw.join() pr.join() print("---父进程结束---")

上述例子的运行结果为:

---父进程开始--- ---等待子进程结束--- 写入: 消息0 写入: 消息1 写入: 消息2 写入: 消息3 写入: 消息4 读取: 消息0 读取: 消息1 读取: 消息2 读取: 消息3 读取: 消息4 ---父进程结束---

更多请参考

Python 进阶之路
最新回复(0)