第17章:使用 concurrent.futures 模块处理并发-ThreadPoolExecutor 多线程并发和 Future 介绍

tech2023-02-25  92

本章主要讨论 Python 3.2 引入的 concurrent.futures 模块。这一章还会介绍“Future”的概念,Future 指一种对象,表示异步执行的操作。这个概念的作用很大,是 concurrent.futures 模块和 asyncio 包的基础。

17.1 使用 concurrent.futures 模块并发处理下载任务

为了高效处理网络 I/O,需要使用并发,因为网络有很高的延迟,所以为了不浪费 CPU 周期去等待,最好在收到网络响应之前做些其他的事。为了说明这一点,我们通过代码来示例:

示例 17-1-1 普通批量下载:依次循环从网上下载 20 个国家的国旗图像:

# 可以从输出结果中看到,下载 20 个国旗图片共计用时 15.41s,示例中没有什么新知识,只是与其他脚本对比的基准;

import os import sys import time import requests # 国家简称列表 POP20_CC = 'CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR'.split() # 下载地址 BASE_URL = 'http://flupy.org/data/flags' # 保存地址 DOWN_DIR = 'downloads/' def download_flag(cc): """下载国旗""" # 图片地址 url = f'{BASE_URL}/{cc.lower()}/{cc.lower()}.gif' image = requests.get(url).content # 下载完成打印出国家 print(cc, end=' ') # 刷新控制台 sys.stdout.flush() # 保存图片 path = os.path.join(DOWN_DIR, cc + '.gif') with open(path, 'wb') as fp: fp.write(image) # 返回国家简称 return cc def batch_downloads(): """批量下载""" down_list = [] for country in sorted(POP20_CC): cc = download_flag(country) down_list.append(cc) return down_list def main(): """开始下载并计算下载所需时间""" start_time = time.time() flags = batch_downloads() total_time = time.time() - start_time print('\n{} flags downloaded in {:.2f}s'.format(len(flags), total_time)) if __name__ == '__main__': main() # 结果输出: # BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN # 20 flags downloaded in 15.41s # Process finished with exit code 0

示例 17-1-2 使用 concurrent.futures 模块下载:

concurrent.futures 模块的主要特色是 ThreadPoolExecutorProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。不过,这个接口抽象的层级很高,像下载国旗这种简单的案例,无需关心任何实现细节。

在此示例中,只是将 batch_downloadsfor 循环依次下载改成了 ThreadPoolExecutor.map 实现多线程并发下载,未改动部分不再重复;

... from concurrent import futures ... # 最下线程数 MAX_WORKERS = 20 def download_flag(cc): ... def batch_downloads(): """多线程下载""" # 确定线程池数量 workers = min(MAX_WORKERS, len(POP20_CC)) # 启动线程池 with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_flag, sorted(POP20_CC)) # 返回结果列表 return list(res) def main(): ... if __name__ == '__main__': main() # 结果输出: # BR MX IN EG ET JP NG BD DE US CD CN VN TR PH FR RU ID PK IR # 20 flags downloaded in 1.20s # Process finished with exit code 0

通过 2 次示例的对比,多线程并发下载比依次下载快了 13 倍,如果把下载的文件数量增加到几百个,并发下载的脚本能比依序下载的脚本快 20 倍或更多。

我们来说下 ThreadPoolExecutor 的使用时的注意事项:

ThreadPoolExecutor 类实例化时,需要设定最多使用几个线程,如不设置,可能会被大量线程数打爆内存。

ThreadPoolExecutor 类实例化时,executor.__exit__ 方法会调用 executor.shutdown(wait=True) 方法,它会在所有线程都执行完毕 前阻塞线程。

map 方法的作用与内置的 map 函数类似,不过被调用的函数会在多个线程中并发调用;map 方法返回一个生成器,因此可以迭代获取各个函数返回的值。

如果有线程抛出异常,异常会在从生成器中取值时抛出,而不是在线程运行时抛出——这与隐式调用 next() 函数从迭代器中获取相应的返回值一样。

17.1.3 Future 在哪里

Future 是 concurrent.futures 模块和 asyncio 包的重要组件,可是, 作为这两个库的用户,我们有时却见不到期物。示例 17-1-2 在背后用到了 Future,但在编写的代码没有直接使用。这一节概述期物,还会举一个例子,展示用法。

从 Python 3.4 起,标准库中有两个名为 Future 的类:concurrent.futures.Futureasyncio.Future。这两个类的作用相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的延迟计算。

Future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。

我们要记住一件事:通常情况下自己不应该创建 Future 对象,而只能由并发框架(concurrent.futuresasyncio)实例化。原因很简单:Future 表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已经排定。因此,只有排定把某件事交给 concurrent.futures.Executor 子类处理时,才会创建 concurrent.futures.Future 实例。例如,Executor.submit() 方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个 Future

客户端代码不应该改变 Future 的状态,并发框架在 Future 表示的延迟计算结束后会改变期物的状态,而我们无法控制计算何时结束。

两种 Future 拥有的方法:

done()  :这个方法不阻塞,返回值是 bool , 指明 Future 链接的可调用对象是否已经执行,但客户端代码通常不会询问 Future 是否运行结束,而是会等待通知。add_done_callback()  :这个方法只有一个参数,类型是可调用的对象,Future 运行结束后会调用指定的可调用对象。result()  :在 Future 运行结束后调用的话,这个方法在两个 Future 类中的作用相同:返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。可是,如果 Future 没有运行结束,result() 方法在两个 Future 类中的行为相差很大。对 concurrency.futures.Future 实例来说,调用 f.result() 方法会阻塞调用方所在的线程,直到有结果可返回。此时,result() 方法可以接收可选的 timeout 参数,如果在指定的时间内期物没有运行完毕,会抛出 TimeoutError 异常。而 asyncio.Future.result 方法不支持设定超时时间,在这个库中获取 Future 的结果最好使用 yield from 结构。

这两个库中有几个函数会返回 Future,其他函数则是使用 Future,以用户易于理解的方式实现自身。例如 17-1-2 中的 Executor.map 方法属于后者: 返回值是一个迭代器,迭代器的 __next__ 方法调用各个 Futureresult 方法,因此我们得到的是各个 Future 的结果,而非 Future 本身。

 

最新回复(0)