Spaces:
				
			
			
	
			
			
		Running
		
			on 
			
			Zero
	
	
	
			
			
	
	
	
	
		
		
		Running
		
			on 
			
			Zero
	| # Author: Bingxin Ke | |
| # Last modified: 2024-11-25 | |
| import concurrent.futures | |
| from typing import Union | |
| import matplotlib | |
| import numpy as np | |
| from tqdm import tqdm | |
| def colorize_depth( | |
| depth: np.ndarray, | |
| min_depth: float, | |
| max_depth: float, | |
| cmap: str = "Spectral_r", | |
| valid_mask: Union[np.ndarray, None] = None, | |
| ) -> np.ndarray: | |
| assert len(depth.shape) >= 2, "Invalid dimension" | |
| if depth.ndim < 3: | |
| depth = depth[np.newaxis, :, :] | |
| # colorize | |
| cm = matplotlib.colormaps[cmap] | |
| depth = ((depth - min_depth) / (max_depth - min_depth)).clip(0, 1) | |
| img_colored_np = cm(depth, bytes=False)[:, :, :, 0:3] # value from 0 to 1 | |
| if valid_mask is not None: | |
| valid_mask = valid_mask.squeeze() # [H, W] or [B, H, W] | |
| if valid_mask.ndim < 3: | |
| valid_mask = valid_mask[np.newaxis, np.newaxis, :, :] | |
| else: | |
| valid_mask = valid_mask[:, np.newaxis, :, :] | |
| valid_mask = np.repeat(valid_mask, 3, axis=1) | |
| img_colored_np[~valid_mask] = 0 | |
| return img_colored_np | |
| def colorize_depth_multi_thread( | |
| depth: np.ndarray, | |
| valid_mask: Union[np.ndarray, None] = None, | |
| chunk_size: int = 4, | |
| num_threads: int = 4, | |
| color_map: str = "Spectral", | |
| verbose: bool = False, | |
| ) -> np.ndarray: | |
| depth = depth.squeeze(1) | |
| assert 3 == depth.ndim | |
| n_frame = depth.shape[0] | |
| if valid_mask is None: | |
| valid_depth = depth | |
| else: | |
| valid_depth = depth[valid_mask] | |
| min_depth = valid_depth.min() | |
| max_depth = valid_depth.max() | |
| def process_chunk(chunk): | |
| chunk = colorize_depth( | |
| chunk, min_depth=min_depth, max_depth=max_depth, cmap=color_map | |
| ) | |
| chunk = (chunk * 255).astype(np.uint8) | |
| return chunk | |
| # Pre-allocate the full array | |
| colored = np.empty((*depth.shape[:3], 3), dtype=np.uint8) | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: | |
| # Submit all tasks and store futures with their corresponding indices | |
| future_to_index = { | |
| executor.submit(process_chunk, depth[i : i + chunk_size]): i | |
| for i in range(0, n_frame, chunk_size) | |
| } | |
| # Process futures in the order they were submitted | |
| chunk_iterable = concurrent.futures.as_completed(future_to_index) | |
| if verbose: | |
| chunk_iterable = tqdm( | |
| chunk_iterable, | |
| desc=" colorizing", | |
| leave=False, | |
| total=len(future_to_index), | |
| ) | |
| for future in chunk_iterable: | |
| index = future_to_index[future] | |
| start = index | |
| end = min(index + chunk_size, n_frame) | |
| result = future.result() | |
| colored[start:end] = result | |
| return colored | 

