concurrent.futures模块使用

tech2023-02-28  107

 

一、概念总结

1-1 池:控制进程数或线程数的概念。

服务器开启的进程数或线程数,会随并发的客户端数目单调递增。会产生巨大的压力于服务器,于是使用“池”的概念,对服务端开启的进程数或线程数加以控制

- 进程池:用来存放进程的‘池’- 线程池:用来存放线程的‘池’

1-2 同步&异步调用:提交任务的两种方式

- 同步调用:提交任务,原地等待任务执行结束,拿到任务返回结果。再执行下一行代码,会导致任务串行执行。- 异步调用:提交任务,不进行原地等待,直接执行下一行代码,任务并发执行。

1-3 阻塞&非阻塞:程序的运行状态

- 阻塞:程序遇到IO操作等,进行原地等待,即阻塞态- 非阻塞:程序未遭遇IO操作等,不进行等待,即运行态、就绪态

二、concurrent.futures 模块实现池

2-0 concurrent.futures基本总结

concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用ProcessPoolExecutor: 进程池,提供异步调用

2-0-1 基本方法(进程池和线程池都适用)

submit(fn, *args, **kwargs) # 异步提交任务 ''' !!!注意:submit提交后返回的结果是一个future对象,需要使用future对象.result才能获取想要的字符串等结果 ''' map(func, *iterables, timeout=None, chunksize=1)  # 取代for循环submit的操作 shutdown(wait=True)  ''' 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 ''' result(timeout=None) # 取得结果 add_done_callback(fn) # 回调函数

 

2-1 进程池

2-1-1 进程池的两种任务提交方式

方式一、同步调用方式

from concurrent.futures import ProcessPoolExecutor import time, random, os def task(name): print('%s %s is running' % (name, os.getpid())) time.sleep(random.randint(1, 3)) if __name__ == '__main__': p = ProcessPoolExecutor(4) # 设置进程池内进程数 for i in range(10): # 同步调用方式,调用和等值。 obj = p.submit(task, '进程pid:') # 传参方式 (任务名,参数),参数使用位置参数或者关键字参数 res = obj.result() p.shutdown(wait=True) # 关闭进程池的入口,等待池内任务运行结束 print('主')

方式二、异步调用方式

from concurrent.futures import ProcessPoolExecutor import time, random, os def task(name): print('%s %s is running' % (name, os.getpid())) time.sleep(random.randint(1, 3)) if __name__ == '__main__': p = ProcessPoolExecutor(4) # 设置进程池内进程数 for i in range(10): # 异步调用方式,只调用,不等值 p.submit(task, '进程pid:') # 传参方式 (任务名,参数),参数使用位置参数或者关键字参数 p.shutdown(wait=True) # 关闭进程池的入口,等待池内任务运行结束 print('主')

2-1-2 进程池的同步调用方式

''' 同步调用方式解析: 缺点:解耦合 优点:速度慢 ''' from concurrent.futures import ProcessPoolExecutor import time, os import requests def get(url): print('%s GET %s' % (os.getpid(), url)) time.sleep(3) response = requests.get(url) if response.status_code == 200: res = response.text else: res = '下载失败' return res def parse(res): time.sleep(1) print('%s 解析结果为%s' % (os.getpid(), len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p = ProcessPoolExecutor(9) l = [] start = time.time() for url in urls: future = p.submit(get, url) l.append(future) p.shutdown(wait=True) for future in l: parse(future.result()) print('完成时间', time.time() - start) ''' 6104 GET https://www.baidu.com 11096 GET https://www.sina.com.cn 6188 GET https://www.tmall.com 9224 GET https://www.jd.com 10376 GET https://www.python.org 7144 GET https://www.openstack.org 5516 GET https://www.baidu.com 9496 GET https://www.baidu.com 1100 GET https://www.baidu.com 14704 解析结果为2443 14704 解析结果为569114 14704 解析结果为233353 14704 解析结果为108550 14704 解析结果为48821 14704 解析结果为65099 14704 解析结果为2443 14704 解析结果为2443 14704 解析结果为2443 完成时间 19.062582969665527 '''

2-1-3 进程池的异步调用方式

''' 异步调用方式: 缺点:存在耦合 优点:速度快 ''' from concurrent.futures import ProcessPoolExecutor import time, os import requests def get(url): print('%s GET %s' % (os.getpid(), url)) time.sleep(3) response = requests.get(url) if response.status_code == 200: res = response.text else: res = '下载失败' parse(res) def parse(res): time.sleep(1) print('%s 解析结果为%s' % (os.getpid(), len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p = ProcessPoolExecutor(9) start = time.time() for url in urls: future = p.submit(get, url) p.shutdown(wait=True) print('完成时间', time.time() - start) ''' 11980 GET https://www.baidu.com 15308 GET https://www.sina.com.cn 14828 GET https://www.tmall.com 11176 GET https://www.jd.com 14792 GET https://www.python.org 14764 GET https://www.openstack.org 11096 GET https://www.baidu.com 13708 GET https://www.baidu.com 2080 GET https://www.baidu.com 14828 解析结果为233353 11176 解析结果为108569 11980 解析结果为2443 15308 解析结果为569124 11096 解析结果为2443 13708 解析结果为2443 2080 解析结果为2443 14764 解析结果为65099 14792 解析结果为48821 完成时间 7.404443979263306 '''

2-1-4 进程池,异步调用+回调函数:解决耦合,速度慢

''' 异步调用 + 回调函数 :解决耦合,但速度慢 ''' from concurrent.futures import ProcessPoolExecutor import time, os import requests def get(url): print('%s GET %s' % (os.getpid(), url)) time.sleep(3) response = requests.get(url) if response.status_code == 200: res = response.text else: res = '下载失败' return res def parse(future): time.sleep(1) # 传入的是个对象,获取返回值 需要进行result操作 res = future.result() print('%s 解析结果为%s' % (os.getpid(), len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] p = ProcessPoolExecutor(9) start = time.time() for url in urls: future = p.submit(get, url) # 模块内的回调函数方法,parse会使用future对象的返回值,对象返回值是执行任务的返回值 future.add_done_callback(parse) p.shutdown(wait=True) print('完成时间', time.time() - start) ''' 3960 GET https://www.baidu.com 14320 GET https://www.sina.com.cn 1644 GET https://www.tmall.com 196 GET https://www.jd.com 13512 GET https://www.python.org 7356 GET https://www.openstack.org 14952 GET https://www.baidu.com 9528 GET https://www.baidu.com 11940 GET https://www.baidu.com 15292 解析结果为233360 15292 解析结果为108543 15292 解析结果为2443 15292 解析结果为2443 15292 解析结果为2443 15292 解析结果为2443 15292 解析结果为569140 15292 解析结果为48821 15292 解析结果为65099 完成时间 14.311698913574219 '''

2-2 线程池:异步+回调 ---- IO密集型主要使用方式

'''线程池:执行操作为谁有空谁执行''' from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time import requests def get(url): print('%s GET %s' % (current_thread().name, url)) time.sleep(3) response = requests.get(url) if response.status_code == 200: res = response.text else: res = '下载失败' return res def parse(future): time.sleep(1) res = future.result() print('%s 解析结果为%s' % (current_thread().name, len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.sina.com.cn', 'https://www.tmall.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] # 线程池内线程数 p = ThreadPoolExecutor(4) start = time.time() for url in urls: future = p.submit(get, url) future.add_done_callback(parse) p.shutdown(wait=True) print('主', current_thread().name) print('完成时间', time.time() - start) ''' <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 GET https://www.baidu.com <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 GET https://www.sina.com.cn <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 GET https://www.tmall.com <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 GET https://www.jd.com <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 解析结果为233360 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 GET https://www.python.org <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 解析结果为2443 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 GET https://www.openstack.org <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 解析结果为108554 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 GET https://www.baidu.com <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 解析结果为569140 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 GET https://www.baidu.com <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 解析结果为2443 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 GET https://www.baidu.com <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 解析结果为2443 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 解析结果为48821 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 解析结果为2443 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 解析结果为65099 主 MainThread 完成时间 14.592755317687988 '''
最新回复(0)