一、概念总结
1-1 池:控制进程数或线程数的概念。
服务器开启的进程数或线程数,会随并发的客户端数目单调递增。会产生巨大的压力于服务器,于是使用“池”的概念,对服务端开启的进程数或线程数加以控制
- 进程池:用来存放进程的‘池’- 线程池:用来存放线程的‘池’
1-2 同步&异步调用:提交任务的两种方式
- 同步调用:提交任务,原地等待任务执行结束,拿到任务返回结果。再执行下一行代码,会导致任务串行执行。- 异步调用:提交任务,不进行原地等待,直接执行下一行代码,任务并发执行。
1-3 阻塞&非阻塞:程序的运行状态
- 阻塞:程序遇到IO操作等,进行原地等待,即阻塞态- 非阻塞:程序未遭遇IO操作等,不进行等待,即运行态、就绪态
二、concurrent.futures 模块实现池
2-0 concurrent.futures基本总结
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用ProcessPoolExecutor: 进程池,提供异步调用
2-0-1 基本方法(进程池和线程池都适用)
submit(fn, *args, **kwargs) # 异步提交任务
'''
!!!注意:submit提交后返回的结果是一个future对象,需要使用future对象.result才能获取想要的字符串等结果
'''
map(func, *iterables, timeout=None, chunksize=1) # 取代for循环submit的操作
shutdown(wait=True)
'''
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
'''
result(timeout=None) # 取得结果
add_done_callback(fn) # 回调函数
2-1 进程池
2-1-1 进程池的两种任务提交方式
方式一、同步调用方式
from concurrent.futures import ProcessPoolExecutor
import time, random, os
def task(name):
print('%s %s is running' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
p = ProcessPoolExecutor(4) # 设置进程池内进程数
for i in range(10):
# 同步调用方式,调用和等值。
obj = p.submit(task, '进程pid:') # 传参方式 (任务名,参数),参数使用位置参数或者关键字参数
res = obj.result()
p.shutdown(wait=True) # 关闭进程池的入口,等待池内任务运行结束
print('主')
方式二、异步调用方式
from concurrent.futures import ProcessPoolExecutor
import time, random, os
def task(name):
print('%s %s is running' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
p = ProcessPoolExecutor(4) # 设置进程池内进程数
for i in range(10):
# 异步调用方式,只调用,不等值
p.submit(task, '进程pid:') # 传参方式 (任务名,参数),参数使用位置参数或者关键字参数
p.shutdown(wait=True) # 关闭进程池的入口,等待池内任务运行结束
print('主')
2-1-2 进程池的同步调用方式
'''
同步调用方式解析:
缺点:解耦合
优点:速度慢
'''
from concurrent.futures import ProcessPoolExecutor
import time, os
import requests
def get(url):
print('%s GET %s' % (os.getpid(), url))
time.sleep(3)
response = requests.get(url)
if response.status_code == 200:
res = response.text
else:
res = '下载失败'
return res
def parse(res):
time.sleep(1)
print('%s 解析结果为%s' % (os.getpid(), len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
p = ProcessPoolExecutor(9)
l = []
start = time.time()
for url in urls:
future = p.submit(get, url)
l.append(future)
p.shutdown(wait=True)
for future in l:
parse(future.result())
print('完成时间', time.time() - start)
'''
6104 GET https://www.baidu.com
11096 GET https://www.sina.com.cn
6188 GET https://www.tmall.com
9224 GET https://www.jd.com
10376 GET https://www.python.org
7144 GET https://www.openstack.org
5516 GET https://www.baidu.com
9496 GET https://www.baidu.com
1100 GET https://www.baidu.com
14704 解析结果为2443
14704 解析结果为569114
14704 解析结果为233353
14704 解析结果为108550
14704 解析结果为48821
14704 解析结果为65099
14704 解析结果为2443
14704 解析结果为2443
14704 解析结果为2443
完成时间 19.062582969665527
'''
2-1-3 进程池的异步调用方式
'''
异步调用方式:
缺点:存在耦合
优点:速度快
'''
from concurrent.futures import ProcessPoolExecutor
import time, os
import requests
def get(url):
print('%s GET %s' % (os.getpid(), url))
time.sleep(3)
response = requests.get(url)
if response.status_code == 200:
res = response.text
else:
res = '下载失败'
parse(res)
def parse(res):
time.sleep(1)
print('%s 解析结果为%s' % (os.getpid(), len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
p = ProcessPoolExecutor(9)
start = time.time()
for url in urls:
future = p.submit(get, url)
p.shutdown(wait=True)
print('完成时间', time.time() - start)
'''
11980 GET https://www.baidu.com
15308 GET https://www.sina.com.cn
14828 GET https://www.tmall.com
11176 GET https://www.jd.com
14792 GET https://www.python.org
14764 GET https://www.openstack.org
11096 GET https://www.baidu.com
13708 GET https://www.baidu.com
2080 GET https://www.baidu.com
14828 解析结果为233353
11176 解析结果为108569
11980 解析结果为2443
15308 解析结果为569124
11096 解析结果为2443
13708 解析结果为2443
2080 解析结果为2443
14764 解析结果为65099
14792 解析结果为48821
完成时间 7.404443979263306
'''
2-1-4 进程池,异步调用+回调函数:解决耦合,速度慢
'''
异步调用 + 回调函数 :解决耦合,但速度慢
'''
from concurrent.futures import ProcessPoolExecutor
import time, os
import requests
def get(url):
print('%s GET %s' % (os.getpid(), url))
time.sleep(3)
response = requests.get(url)
if response.status_code == 200:
res = response.text
else:
res = '下载失败'
return res
def parse(future):
time.sleep(1)
# 传入的是个对象,获取返回值 需要进行result操作
res = future.result()
print('%s 解析结果为%s' % (os.getpid(), len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
p = ProcessPoolExecutor(9)
start = time.time()
for url in urls:
future = p.submit(get, url)
# 模块内的回调函数方法,parse会使用future对象的返回值,对象返回值是执行任务的返回值
future.add_done_callback(parse)
p.shutdown(wait=True)
print('完成时间', time.time() - start)
'''
3960 GET https://www.baidu.com
14320 GET https://www.sina.com.cn
1644 GET https://www.tmall.com
196 GET https://www.jd.com
13512 GET https://www.python.org
7356 GET https://www.openstack.org
14952 GET https://www.baidu.com
9528 GET https://www.baidu.com
11940 GET https://www.baidu.com
15292 解析结果为233360
15292 解析结果为108543
15292 解析结果为2443
15292 解析结果为2443
15292 解析结果为2443
15292 解析结果为2443
15292 解析结果为569140
15292 解析结果为48821
15292 解析结果为65099
完成时间 14.311698913574219
'''
2-2 线程池:异步+回调 ---- IO密集型主要使用方式
'''线程池:执行操作为谁有空谁执行'''
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
import requests
def get(url):
print('%s GET %s' % (current_thread().name, url))
time.sleep(3)
response = requests.get(url)
if response.status_code == 200:
res = response.text
else:
res = '下载失败'
return res
def parse(future):
time.sleep(1)
res = future.result()
print('%s 解析结果为%s' % (current_thread().name, len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
# 线程池内线程数
p = ThreadPoolExecutor(4)
start = time.time()
for url in urls:
future = p.submit(get, url)
future.add_done_callback(parse)
p.shutdown(wait=True)
print('主', current_thread().name)
print('完成时间', time.time() - start)
'''
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 GET https://www.baidu.com
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 GET https://www.sina.com.cn
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 GET https://www.tmall.com
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 GET https://www.jd.com
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 解析结果为233360
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 GET https://www.python.org
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 解析结果为2443
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 GET https://www.openstack.org
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 解析结果为108554
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 GET https://www.baidu.com
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 解析结果为569140
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 GET https://www.baidu.com
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 解析结果为2443
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 GET https://www.baidu.com
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_3 解析结果为2443
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_2 解析结果为48821
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_1 解析结果为2443
<concurrent.futures.thread.ThreadPoolExecutor object at 0x00000172EB2053C8>_0 解析结果为65099
主 MainThread
完成时间 14.592755317687988
'''