2023年4月19日

python3 多线程任务封装

class DealThread(threading.Thread):
    def __init__(self, queue, task_result, **kwargs):
        super().__init__()
        self.queue = queue
        self.task_result = task_result
        self.kwargs = kwargs
        self.count = 0
        self.exit = False
    
    def process(self, obj, **self.kwargs):
        # DIV
        return None

    def run(self):
        while self.exit or (not self.queue.empty()):
            try:
                obj = self.queue.get(block=False)   
                result = self.process(obj, **self.kwargs)
                self.task_result.append(result)
                self.queue.task_done()
                self.count += 1
            except queue.Empty as e:
                break
            except Exception as ee:
                logger.warning(f"unkonw error at process")
                logger.exception(ee)
class TaskPool:
    def __init__(self, **kwargs):
        self.kwargs = kwargs
        self.task_queue = queue.Queue(1000000)
        self.task_result = []

    def start(self, data, num_threads=100, second=10, **kwargs):
        task_num = len(table)
        logger.info(f"total task={task_num}, thread={num_threads}, log second={second}")

        for i in range(len(data)):
            task_queue.put(data[i])

        threads = [DealThread(self.task_queue, self.task_result, **self.kwargs) for i in range(num_threads)]
	# 启动多个线程
        for thread in threads:
            thread.setDaemon(True)
            thread.start()
        logger.info(f"Start to work!")
        # 主线程每X秒检查一次子线程存活状态
        force_kill_wait = round(120/second)
        while True:
            time.sleep(second)
            state = sum([1 if thread.is_alive() else 0 for thread in threads])
            process = sum([thread.count for thread in threads])
            logger.info(f"process {process}/{task_num}, thread {state}/{num_threads}")
            if state == 0:
                break
            if state < max(num_threads//2, 0):
                force_kill_wait -= 1
            if force_kill_wait <= 0:
                logger.info(f"process {process}/{task_num}, thread {state}/{num_threads} force stop!")
                for thread in threads:
                    if thread.is_alive():
                        thread.exit = True
                break
Share

You may also like...

发表评论

您的电子邮箱地址不会被公开。