from multiprocessing import Process, Manager, Queue
from queue import Empty
from threading import Thread
import sys, os, time, random
class IThread(Thread):
def __init__(self, input_queue, output_queue, **kwargs):
super().__init__()
self.input_queue = input_queue
self.output_queue = output_queue
self.kwargs = kwargs
self.count = 0
self.exit = False
self.init()
def init(self):
pass
def target(self, task):
print("run", task)
time.sleep(0.5)
return self.count
def run(self):
while self.exit==False and (not self.input_queue.empty()):
try:
task = self.input_queue.get() # Retrieve the URL from the queue
result = self.target(task)
if result is not None and self.output_queue is not None:
self.output_queue.put(result)
#self.input_queue.task_done()
self.count += 1
except Empty as e:
#time.sleep(1)
break
except Exception as ee:
print(f"unkonw error when dealing task: {ee}")
class IProcess(Process):
def __init__(self, target_thread, input_queue, output_queue, process_count, pid, thread_num, log_second, **kwargs):
super().__init__()
self.target_thread = target_thread
self.input_queue = input_queue
self.output_queue = output_queue
self.process_id = pid
self.thread_num = thread_num
self.log_second = log_second
self.kwargs = kwargs
self.process_count = process_count
self.exit = False
def run(self):
threads = [self.target_thread(self.input_queue, self.output_queue, **self.kwargs) for i in range(self.thread_num)]
# 启动多个线程
for thread in threads:
thread.setDaemon(True)
thread.start()
# 主线程每10秒检查一次子线程存货状态
force_kill_wait = round(240/self.log_second)
while True:
time.sleep(self.log_second)
state = sum([1 if thread.is_alive() else 0 for thread in threads])
count = sum([thread.count for thread in threads])
self.process_count[self.process_id] = count
#print(f"process-{self.process_id} task={count}, thread {state}/{self.thread_num}")
if state == 0:
break
if state < max(self.thread_num//2, 0):
force_kill_wait -= 1
if force_kill_wait <= 0 or self.exit:
print(f"pid={self.process_id} thread {state}/{self.thread_num} force stop!")
for thread in threads:
if thread.is_alive():
thread.exit = True
break
class SummaryThread(Thread):
def __init__(self, input_queue:Queue, **kwargs):
super().__init__()
self.input_queue = input_queue
self.kwargs = kwargs
self.init()
self.exit = False
def init(self):
pass
def summary_function(self, result):
print("summary", result)
def stop(self):
pass
def run(self):
while True:
if not self.input_queue.empty():
result = self.input_queue.get()
self.summary_function(result)
#self.output_queue.task_done()
elif self.input_queue.empty() and self.exit:
print(f"Summary thread done!")
break
else:
time.sleep(1)
self.stop()
class MultiProcessMultiThreadRunner:
def __init__(self, target_thread=IThread, summary_thread=SummaryThread, process_num=2, thread_num=2, log_second=10, log_target=None, **kwargs) -> None:
self.log_second = log_second
if log_target is not None:
log_target = os.path.join(log_target, "log")
if not os.path.exists(log_target):
os.makedirs(log_target)
self.log = open(os.path.join(log_target, "log_{}.txt".format(int(time.time()))), "w")
else:
self.log = sys.stdout
self.target_thread = target_thread
self.summary = None
self.process_num = process_num
self.thread_num = thread_num
self.kwargs = kwargs
self.total_task = 0
self.exit = False
self.input_queue = Queue(maxsize=-1)
self.output_queue = Queue(maxsize=-1)
#self.summary_thread = summary_thread
if summary_thread is not None:
self.summary_thread = summary_thread(self.output_queue, **self.kwargs)
self.summary_thread.setDaemon(True)
else:
self.summary_thread = None
def start(self, tasks:list):
self.exit = False
self.add_input(tasks)
if self.summary_thread is not None:
self.summary_thread.start()
self.run()
def add_input(self, tasks:list):
for task in tasks:
self.input_queue.put(task)
self.total_task += len(tasks)
print(f"Input {len(tasks)} new tasks, total {self.total_task} tasks.", file=self.log)
def run(self):
with Manager() as manager:
count = manager.list(range(self.process_num))
processes = [IProcess(self.target_thread, self.input_queue, self.output_queue, count, i, self.thread_num, self.log_second, **self.kwargs) for i in range(self.process_num)]
# 启动多个线程
for process in processes:
process.daemon=True
process.start()
print(f"Process Start to work!", file=self.log)
# 主线程每10秒检查一次子线程存货状态
force_kill_wait = round(240/self.log_second)
while True:
time.sleep(self.log_second)
state = sum([1 if process.is_alive() else 0 for process in processes])
#count = sum([process.count for process in processes])
print(f"task {sum(count)}/{self.total_task}, process {state}/{self.process_num}", file=self.log)
if state == 0:
break
if state < max(self.process_num//2, 0):
force_kill_wait -= 1
if force_kill_wait <= 0:
print(f"process {count}/{self.total_task}, process {state}/{self.process_num} force stop!", file=self.log)
for process in processes:
if process.is_alive():
process.exit = True
break
self.exit = True
if self.summary_thread is not None:
self.summary_thread.exit = True
while self.summary_thread.is_alive():
print("waiting summary thread")
time.sleep(self.log_second)
if __name__ == "__main__":
runner = MultiProcessMultiThreadRunner(IThread)
runner.start([0,1,2,3,4,5,6,7,8,9,0])
运行Demo
from multi_process_runner import MultiProcessMultiThreadRunner, IThread, SummaryThread
import pandas as pd
import json
import cv2
import numpy as np
import os
class DSummaryThread(SummaryThread):
def init(self):
self.file = open(f"{self.kwargs.get('file_id')}.json", "w")
self.file.write("[\n")
def summary_function(self, result):
temp = json.dumps(result, indent=4)
self.file.write(temp+",\n")
def stop(self):
self.file.write("]")
self.file.close()
class DThread(IThread):
def target(self, task):
return {}
if __name__ == "__main__":
for i in range(10):
runner = MultiProcessMultiThreadRunner(DThread, DSummaryThread, process_num=128, thread_num=4, log_second=10, file_id=i)
tasks = [range(500)]
print(f"total {len(tasks)} tasks.")
runner.start(tasks)