class DealThread(threading.Thread):
def __init__(self, queue, task_result, **kwargs):
super().__init__()
self.queue = queue
self.task_result = task_result
self.kwargs = kwargs
self.count = 0
self.exit = False
def process(self, obj, **self.kwargs):
# DIV
return None
def run(self):
while self.exit or (not self.queue.empty()):
try:
obj = self.queue.get(block=False)
result = self.process(obj, **self.kwargs)
self.task_result.append(result)
self.queue.task_done()
self.count += 1
except queue.Empty as e:
break
except Exception as ee:
logger.warning(f"unkonw error at process")
logger.exception(ee)
class TaskPool:
def __init__(self, **kwargs):
self.kwargs = kwargs
self.task_queue = queue.Queue(1000000)
self.task_result = []
def start(self, data, num_threads=100, second=10, **kwargs):
task_num = len(table)
logger.info(f"total task={task_num}, thread={num_threads}, log second={second}")
for i in range(len(data)):
task_queue.put(data[i])
threads = [DealThread(self.task_queue, self.task_result, **self.kwargs) for i in range(num_threads)]
# 启动多个线程
for thread in threads:
thread.setDaemon(True)
thread.start()
logger.info(f"Start to work!")
# 主线程每X秒检查一次子线程存活状态
force_kill_wait = round(120/second)
while True:
time.sleep(second)
state = sum([1 if thread.is_alive() else 0 for thread in threads])
process = sum([thread.count for thread in threads])
logger.info(f"process {process}/{task_num}, thread {state}/{num_threads}")
if state == 0:
break
if state < max(num_threads//2, 0):
force_kill_wait -= 1
if force_kill_wait <= 0:
logger.info(f"process {process}/{task_num}, thread {state}/{num_threads} force stop!")
for thread in threads:
if thread.is_alive():
thread.exit = True
break
2023年4月19日