第17章:使用 concurrent.futures 模块处理并发-使用 futures.as

tech2023-10-29  87

前面的章节我们使用了 Executor.map 函数,这个函数易于使用,不过有个特性可能有用,也可能没用, 具体情况取决于需求:这个函数返回结果的顺序与调用开始的顺序一致。

我们知道 map 返回的结果是一个生成器 results,当我们用 for 循环取值时, for 会隐式调用 next(results) 返回第一个任务,然后又会在第一个任务的 Future 实例上调用 .result() 方法。而 result() 方法会阻塞,直到第一个 Future 运行结束;如果第一个调用生成结果用时 10 秒,而其他调用只用 1 秒,代码会阻塞 10 秒,获取 map 方法返回的生成器产出的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。

如果必须等到获取所有结果后再处理,这种行为没问题,不过,通常更可取的方式是,不管提交的顺序,只要有结果就获取。

为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用。这个组合比 executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而 executor.map 只能处理参数不同的同一个可调用对象。此外,传给 futures.as_completed 函数的 Future 集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建。

使用 futures.as_completed 函数

如果我们想使用不阻塞的方法立刻拿到线程执行结果,可以将 Executor.submit 方法和 futures.as_completed 函数结合起来使用,替代 executor.map 方法,实际上 Executor.map 的源码中也是循环调用了 Executor.submit 方法,只不过可能会被 result() 方法阻塞。而 futures.as_completed 函数则是接收一个 Future 组成的可迭代对象,然后返回一个迭代器;迭代器在每个 Future 完成时立刻产出结果,此方法不会被阻塞。

示例 17-5-1 使用 Executor.submit 方法和 futures.as_completed 函数改造代码,使其不阻塞立刻产出国旗下载结果:

下面代码中只修改了 batch_downloads 函数,为了方便对比结果,还需要在 main 函数中将结果打印出来,记得 download_flag 函数中的 print(cc, end=' ') sys.stdout.flush()

... from concurrent import futures ... # 最下线程数 MAX_WORKERS = 20 def download_flag(cc): ... def batch_downloads(): """多线程下载""" # 确定线程池数量 workers = min(MAX_WORKERS, len(POP20_CC)) # 启动线程池 with futures.ThreadPoolExecutor(workers) as executor: # Future 任务列表 future_tasks = [] for cc in sorted(POP20_CC): # 使用 submit 方法启动线程,并将 Future 实例添加进列表 future = executor.submit(download_flag, cc) future_tasks.append(future) # 传入 Future 列表,在每个 Future 完成时产出结果,此方法不阻塞。 task_iter = futures.as_completed(future_tasks) # 完成列表 done_list = [] for future in task_iter: # 从 task_iter 中获取下载结果并添加到完成列表 done_list.append(future.result()) return done_list def main(): ... print(flags) if __name__ == '__main__': main() # 结果输出1: # ['FR', 'PH', 'JP', 'CD', 'IN', 'DE', 'TR', 'BR', 'MX', 'IR', 'CN', 'ID', 'PK', 'VN', 'US', 'NG', 'BD', 'ET', 'RU', 'EG'] # 20 flags downloaded in 1.20s # 结果输出2: # ['PH', 'FR', 'BR', 'ET', 'JP', 'BD', 'IN', 'EG', 'DE', 'MX', 'ID', 'RU', 'CD', 'PK', 'VN', 'NG', 'TR', 'CN', 'IR', 'US'] # 20 flags downloaded in 0.80s # 结果输出3: # ['ET', 'BD', 'NG', 'DE', 'FR', 'IR', 'MX', 'BR', 'IN', 'EG', 'CD', 'CN', 'US', 'RU', 'ID', 'TR', 'VN', 'JP', 'PH', 'PK'] # 20 flags downloaded in 0.57s

从上方示例看到,修改后的代码的 3 次执行结果顺序都不一样,这是因为每个线程的完成顺序都不一样。

然后我们再看下之前的 17-1-2 的示例的执行结果:

从下面的结果上看,每次返回的结果顺序都是一样的,其实它的线程完成顺序也是不一致的,但是因为在获取结果时会因为 result 的方法阻塞问题,导致最终的结果顺序是一致的。

# 结果输出1: # ['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN'] # 20 flags downloaded in 0.41s # 结果输出2: # ['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN'] # 20 flags downloaded in 0.19s # 结果输出3: # ['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN'] # 20 flags downloaded in 0.61s

 

最新回复(0)