2024年1月5日

python3 多进程多线程封装

from multiprocessing import Process, Manager, Queue
from queue import Empty
from threading import Thread
import sys, os, time, random

class IThread(Thread):
    def __init__(self, input_queue, output_queue, **kwargs):
        super().__init__()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.kwargs = kwargs
        self.count = 0
        self.exit = False
        self.init()

    def init(self):
        pass
    
    def target(self, task):
        print("run", task)
        time.sleep(0.5)
        return self.count
        
    def run(self):
        while self.exit==False and (not self.input_queue.empty()):
            try:
                task = self.input_queue.get()   # Retrieve the URL from the queue
                result = self.target(task)
                if result is not None and self.output_queue is not None:
                    self.output_queue.put(result)
                #self.input_queue.task_done()
                self.count += 1
            except Empty as e:
                #time.sleep(1)
                break
            except Exception as ee:
                print(f"unkonw error when dealing task: {ee}")


class IProcess(Process):
    def __init__(self, target_thread, input_queue, output_queue, process_count, pid, thread_num, log_second, **kwargs):
        super().__init__()
        self.target_thread = target_thread
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.process_id = pid
        self.thread_num = thread_num
        self.log_second = log_second
        self.kwargs = kwargs
        self.process_count = process_count
        self.exit = False
    
    def run(self):
        threads = [self.target_thread(self.input_queue, self.output_queue, **self.kwargs) for i in range(self.thread_num)]
        # 启动多个线程
        for thread in threads:
            thread.setDaemon(True)
            thread.start()
        # 主线程每10秒检查一次子线程存货状态
        force_kill_wait = round(240/self.log_second)
        while True:
            time.sleep(self.log_second)
            state = sum([1 if thread.is_alive() else 0 for thread in threads])
            count = sum([thread.count for thread in threads])
            self.process_count[self.process_id] = count
            #print(f"process-{self.process_id} task={count}, thread {state}/{self.thread_num}")
            if state == 0:
                break
            if state < max(self.thread_num//2, 0):
                force_kill_wait -= 1
            if force_kill_wait <= 0 or self.exit:
                print(f"pid={self.process_id} thread {state}/{self.thread_num} force stop!")
                for thread in threads:
                    if thread.is_alive():
                        thread.exit = True
                break

class SummaryThread(Thread):
    def __init__(self, input_queue:Queue, **kwargs):
        super().__init__()
        self.input_queue = input_queue
        self.kwargs = kwargs
        self.init()
        self.exit = False
    
    def init(self):
        pass

    def summary_function(self, result):
        print("summary", result)

    def stop(self):
        pass

    def run(self):
        while True:
            if not self.input_queue.empty():
                result = self.input_queue.get()
                self.summary_function(result)
                #self.output_queue.task_done()
            elif self.input_queue.empty() and self.exit:
                print(f"Summary thread done!")
                break
            else:
                time.sleep(1)
        self.stop()

class MultiProcessMultiThreadRunner:
    def __init__(self, target_thread=IThread, summary_thread=SummaryThread, process_num=2, thread_num=2, log_second=10, log_target=None, **kwargs) -> None:
        self.log_second = log_second
        if log_target is not None:
            log_target = os.path.join(log_target, "log")
            if not os.path.exists(log_target):
                os.makedirs(log_target)
            self.log = open(os.path.join(log_target, "log_{}.txt".format(int(time.time()))), "w")
        else:
            self.log = sys.stdout

        self.target_thread = target_thread
        
        self.summary = None
        self.process_num = process_num
        self.thread_num = thread_num
        self.kwargs = kwargs
        
        self.total_task = 0
        self.exit = False
        self.input_queue = Queue(maxsize=-1)
        self.output_queue = Queue(maxsize=-1)

        #self.summary_thread = summary_thread
        if summary_thread is not None:
            self.summary_thread = summary_thread(self.output_queue, **self.kwargs)
            self.summary_thread.setDaemon(True)
        else:
            self.summary_thread = None

    def start(self, tasks:list):
        self.exit = False
        self.add_input(tasks)
        if self.summary_thread is not None:
            self.summary_thread.start()
        self.run()

    def add_input(self, tasks:list):
        for task in tasks:
            self.input_queue.put(task)
        self.total_task += len(tasks)
        print(f"Input {len(tasks)} new tasks, total {self.total_task} tasks.", file=self.log)

    def run(self):
        with Manager() as manager:
            count = manager.list(range(self.process_num))
            processes = [IProcess(self.target_thread, self.input_queue, self.output_queue, count, i, self.thread_num, self.log_second, **self.kwargs) for i in range(self.process_num)]
            # 启动多个线程
            for process in processes:
                process.daemon=True
                process.start()
            print(f"Process Start to work!", file=self.log)

            # 主线程每10秒检查一次子线程存货状态
            force_kill_wait = round(240/self.log_second)
            while True:
                time.sleep(self.log_second)
                state = sum([1 if process.is_alive() else 0 for process in processes])
                #count = sum([process.count for process in processes])
                print(f"task {sum(count)}/{self.total_task}, process {state}/{self.process_num}", file=self.log)

                if state == 0:
                    break
                if state < max(self.process_num//2, 0):
                    force_kill_wait -= 1
                if force_kill_wait <= 0:
                    print(f"process {count}/{self.total_task}, process {state}/{self.process_num} force stop!", file=self.log)
                    for process in processes:
                        if process.is_alive():
                            process.exit = True
                    break
            self.exit = True
            if self.summary_thread is not None:
                self.summary_thread.exit = True
                while self.summary_thread.is_alive():
                    print("waiting summary thread")
                    time.sleep(self.log_second)
        
if __name__ == "__main__":
    runner = MultiProcessMultiThreadRunner(IThread)
    runner.start([0,1,2,3,4,5,6,7,8,9,0])

运行Demo

from multi_process_runner import MultiProcessMultiThreadRunner, IThread, SummaryThread
import pandas as pd
import json
import cv2
import numpy as np
import os

class DSummaryThread(SummaryThread):
    def init(self):
        self.file = open(f"{self.kwargs.get('file_id')}.json", "w")
        self.file.write("[\n")

    def summary_function(self, result):
        temp = json.dumps(result, indent=4)
        self.file.write(temp+",\n")

    def stop(self):
        self.file.write("]")
        self.file.close()

class DThread(IThread):
    
    def target(self, task):
        return {}

if __name__ == "__main__":
    for i in range(10):
        runner = MultiProcessMultiThreadRunner(DThread, DSummaryThread, process_num=128, thread_num=4, log_second=10, file_id=i)
        tasks = [range(500)]
        print(f"total {len(tasks)} tasks.")
        runner.start(tasks)
Share

You may also like...

发表评论

您的电子邮箱地址不会被公开。