1
+ import time
2
+ import threading
3
+
4
+ class DataSource :
5
+ def __init__ (self , dataFileName , startLine = 0 , maxcount = None ):
6
+ self .dataFileName = dataFileName
7
+ self .startLine = startLine # 第一行行号为1
8
+ self .line_index = startLine # 当前读取位置
9
+ self .maxcount = maxcount # 读取最大行数
10
+ self .lock = threading .RLock () # 同步锁
11
+
12
+ self .__data__ = open (self .dataFileName , 'r' , encoding = 'utf-8' )
13
+ for i in range (self .startLine ):
14
+ l = self .__data__ .readline ()
15
+
16
+ def getLine (self ):
17
+ self .lock .acquire ()
18
+ try :
19
+ if self .maxcount is None or self .line_index < (self .startLine + self .maxcount ):
20
+ line = self .__data__ .readline ()
21
+ if line :
22
+ self .line_index += 1
23
+ return True , line
24
+ else :
25
+ return False , None
26
+ else :
27
+ return False , None
28
+
29
+ except Exception as e :
30
+ return False , "处理出错:" + e .args
31
+ finally :
32
+ self .lock .release ()
33
+
34
+ def __del__ (self ):
35
+ if not self .__data__ .closed :
36
+ self .__data__ .close ()
37
+ print ("关闭数据源:" , self .dataFileName )
38
+
39
+ def process (worker_id , datasource ):
40
+ count = 0
41
+ while True :
42
+ status , data = datasource .getLine ()
43
+ if status :
44
+ print (">>> 线程[%d] 获得数据, 正在处理......" % worker_id )
45
+ time .sleep (3 ) # 等待3秒模拟处理过程
46
+ print (">>> 线程[%d] 处理数据 完成" % worker_id )
47
+ count += 1
48
+ else :
49
+ break # 退出循环
50
+ print (">>> 线程[%d] 结束, 共处理[%d]条数据" % (worker_id , count ))
51
+
52
+
53
+ def main ():
54
+ datasource = DataSource ('data.txt' ) # 创建数据源类
55
+ workercount = 10 # 开启的线程数
56
+ workers = []
57
+ for i in range (workercount ):
58
+ worker = threading .Thread (target = process , args = (i + 1 , datasource ))
59
+ worker .start ()
60
+ workers .append (worker )
61
+
62
+ for worker in workers :
63
+ worker .join ()
0 commit comments