Spaces:
Sleeping
Sleeping
| from typing import Any, List, Callable | |
| import psutil | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from queue import Queue | |
| from .image import ChainImgProcessor | |
| from tqdm import tqdm | |
| import cv2 | |
| def create_queue(temp_frame_paths: List[str]) -> Queue[str]: | |
| queue: Queue[str] = Queue() | |
| for frame_path in temp_frame_paths: | |
| queue.put(frame_path) | |
| return queue | |
| def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]: | |
| queues = [] | |
| for _ in range(queue_per_future): | |
| if not queue.empty(): | |
| queues.append(queue.get()) | |
| return queues | |
| class ChainBatchImageProcessor(ChainImgProcessor): | |
| chain = None | |
| func_params_gen = None | |
| num_threads = 1 | |
| def __init__(self): | |
| ChainImgProcessor.__init__(self) | |
| def init_with_plugins(self): | |
| self.init_plugins(["core"]) | |
| self.display_init_info() | |
| init_on_start_arr = self.init_on_start.split(",") | |
| for proc_id in init_on_start_arr: | |
| self.init_processor(proc_id) | |
| def update_progress(self, progress: Any = None) -> None: | |
| process = psutil.Process(os.getpid()) | |
| memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 | |
| progress.set_postfix({ | |
| 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB', | |
| 'execution_threads': self.num_threads | |
| }) | |
| progress.refresh() | |
| progress.update(1) | |
| def process_frames(self, source_files: List[str], target_files: List[str], current_files, update: Callable[[], None]) -> None: | |
| for f in current_files: | |
| temp_frame = cv2.imread(f) | |
| if temp_frame is not None: | |
| if self.func_params_gen: | |
| params = self.func_params_gen(None, temp_frame) | |
| else: | |
| params = {} | |
| resimg, _ = self.run_chain(temp_frame, params, self.chain) | |
| if resimg is not None: | |
| i = source_files.index(f) | |
| cv2.imwrite(target_files[i], resimg) | |
| if update: | |
| update() | |
| def run_batch_chain(self, source_files, target_files, threads:int = 1, chain = None, params_frame_gen_func = None): | |
| self.chain = chain | |
| self.func_params_gen = params_frame_gen_func | |
| progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' | |
| total = len(source_files) | |
| self.num_threads = threads | |
| with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: | |
| with ThreadPoolExecutor(max_workers=threads) as executor: | |
| futures = [] | |
| queue = create_queue(source_files) | |
| queue_per_future = max(len(source_files) // threads, 1) | |
| while not queue.empty(): | |
| future = executor.submit(self.process_frames, source_files, target_files, pick_queue(queue, queue_per_future), lambda: self.update_progress(progress)) | |
| futures.append(future) | |
| for future in as_completed(futures): | |
| future.result() | |