@@ -4,8 +4,27 @@ python 中的异步库,集成 threading 和 multiprocessing 两个库。
4
4
5
5
可以使用 ` ThreadPoolExecutor ` 和 ` ProcessPoolExecutor ` 来做线程和进程
6
6
7
-
8
- ```
7
+ ### 简单使用
8
+
9
+ 这个库使用很简单,常见方法也即那么几种
10
+ > [ concurrent.futures --- 启动并行任务] ( https://docs.python.org/zh-cn/3/library/concurrent.futures.html )
11
+
12
+ - concurrent.futures.Executor|ThreadPoolExecutor|ProcessPoolExecutor
13
+ - submit
14
+ - map
15
+ - shutdown
16
+ - concurrent.futures.Future
17
+ - cancel
18
+ - cancel
19
+ - cancel
20
+ - done
21
+ - cancel
22
+ - exception
23
+ - add_done_callback
24
+ - concurrent.futures.wait
25
+ - concurrent.futures.as_completed
26
+
27
+ ``` python
9
28
# -*- coding: utf-8 -*-
10
29
11
30
import time
@@ -76,12 +95,78 @@ if __name__ == '__main__':
76
95
77
96
```
78
97
79
- 参考链接
98
+ ### 动态添加任务
99
+
100
+ ``` python
101
+ # -*- coding: utf-8 -*-
102
+ import time
103
+ import random
104
+ import traceback
80
105
81
- [ 使用Python进行并发编程-PoolExecutor篇] ( http://www.dongwm.com/archives/%E4%BD%BF%E7%94%A8Python%E8%BF%9B%E8%A1%8C%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B-PoolExecutor%E7%AF%87/ )
106
+ from concurrent import futures
107
+ from concurrent.futures import ThreadPoolExecutor
108
+
109
+ result = []
110
+
111
+
112
+ def handle (num , pool ):
113
+ try :
114
+ print (" thread for:%d " % num)
115
+ sleep_time = random.random()
116
+ time.sleep(sleep_time)
117
+ if sleep_time < 0.95 :
118
+ print (" thread pool add one" )
119
+ print (" before pool size:%d " % pool._work_queue.qsize())
120
+ pool.submit(handle, num* 10 , pool)
121
+ # result.append(pool.submit(handle, 111, pool))
122
+ print (" after pool size:%d " % pool._work_queue.qsize())
123
+ except Exception as e:
124
+ # 问题在这里:
125
+ # py2: cannot schedule new futures after shutdown
126
+ # py3: cannot schedule new futures after interpreter shutdown
127
+ print (repr (e))
128
+ traceback.print_stack()
129
+
130
+
131
+ if __name__ == ' __main__' :
132
+ # 使用 map 的时候需要注意,其实 pool 会自动帮你把参数 zip 一下,先合并再分别映射
133
+ # 其实最后不用调用 shutdown 也会自动等待所有任务结束,然后主进程结束
134
+ thread_pool = ThreadPoolExecutor(30 )
135
+ # map_result = thread_pool.map(handle, range(2), [thread_pool] * 2)
136
+ # result.extend(list(map_result))
137
+ # 实际上,使用 submit 多个参数的时候也需要注意,参数直接摆开,和 threading 不一样
138
+ for i in range (4 ):
139
+ result.append(thread_pool.submit(handle, i, thread_pool))
140
+ print (" start threading" )
141
+ # 在没有 shutdown 之后为什么还会自动结束呢?谁在
142
+ # thread_pool.shutdown()
143
+ while result:
144
+ print (" before result length:%d " % len (result))
145
+ for f in futures.as_completed(result):
146
+ result.remove(f)
147
+ # 此处还是有风险,如果这里结束了,全都结束了之后,还是会自动调用 shutdown
148
+ # 其实还有待提交的请求,这样就尴尬了。
149
+ # 但是加等待时间也不能够
150
+ if len (result) == 0 :
151
+ time.sleep(2 )
152
+ print (" after result length:%d " % len (result))
153
+ # 还是用 lock 锁定一个计数器比较现实,计数器归零再结束。
154
+
155
+ # futures.wait(result)
156
+ # thread_pool.shutdown()
157
+ # 不需要 as_completed 也不需要手动 wait,实际上在 shutdown 的时候其实就是会等待结束的
158
+ # 如果使用上下文管理器的话会自动 shutdown
159
+ # 问题是在子线程中不能再往线程中加任务了,有点问题
160
+ # with ThreadPoolExecutor(30) as thread_pool:
161
+ # result = thread_pool.map(handle, range(2), [thread_pool] * 2)
162
+ # print("start threading")
163
+ # thread_pool.submit(handle, 111, thread_pool)
82
164
83
- [ 使用Python的 concurrent.futures 模块 ] ( https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html )
165
+ ```
84
166
85
- [ python并发库:concurrent.futures的使用 ] ( https://blog.csdn.net/drdairen/article/details/69487643 )
167
+ 参考链接
86
168
87
- [ python并发 1:使用 futures 处理并发] ( https://segmentfault.com/a/1190000009819359 )
169
+ [ 使用Python进行并发编程-PoolExecutor篇] ( http://www.dongwm.com/archives/%E4%BD%BF%E7%94%A8Python%E8%BF%9B%E8%A1%8C%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B-PoolExecutor%E7%AF%87/ )
170
+ [ 使用Python的 concurrent.futures 模块] ( https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html )
171
+ [ python并发库:concurrent.futures的使用] ( https://blog.csdn.net/drdairen/article/details/69487643 )
172
+ [ python并发 1:使用 futures 处理并发] ( https://segmentfault.com/a/1190000009819359 )
0 commit comments