# Last modified: 2025-01-14 # # Copyright 2025 Ziyang Song, USTC. All rights reserved. # # This file has been modified from the original version. # Original copyright (c) 2023 Bingxin Ke, ETH Zurich. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # -------------------------------------------------------------------------- # If you find this code useful, we kindly ask you to cite our paper in your work. # Please find bibtex at: https://github.com/indu1ge/DepthMaster#-citation # More information about the method can be found at https://indu1ge.github.io/DepthMaster_page # -------------------------------------------------------------------------- from typing import Dict, Optional, Union import numpy as np import torch from diffusers import ( AutoencoderKL, DiffusionPipeline, # UNet2DConditionModel, ) from depthmaster.modules.unet_2d_condition import UNet2DConditionModel from diffusers.utils import BaseOutput from PIL import Image from torch.utils.data import DataLoader, TensorDataset from torchvision.transforms import InterpolationMode from torchvision.transforms.functional import pil_to_tensor, resize from tqdm.auto import tqdm from transformers import CLIPTextModel, CLIPTokenizer from .util.image_util import ( chw2hwc, colorize_depth_maps, get_tv_resample_method, resize_max_res, ) class DepthMasterDepthOutput(BaseOutput): """ Output class for monocular depth prediction pipeline. Args: depth_np (`np.ndarray`): Predicted depth map, with depth values in the range of [0, 1]. depth_colored (`PIL.Image.Image`): Colorized depth map, with the shape of [3, H, W] and values in [0, 1]. uncertainty (`None` or `np.ndarray`): Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling. """ depth_np: np.ndarray depth_colored: Union[None, Image.Image] uncertainty: Union[None, np.ndarray] class DepthMasterPipeline(DiffusionPipeline): """ Pipeline for monocular depth estimation using DepthMaster. This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) Args: unet (`UNet2DConditionModel`): Conditional U-Net to denoise the depth latent, conditioned on image latent. vae (`AutoencoderKL`): Variational Auto-Encoder (VAE) Model to encode and decode images and depth maps to and from latent representations. scheduler (`DDIMScheduler`): A scheduler to be used in combination with `unet` to denoise the encoded image latents. text_encoder (`CLIPTextModel`): Text-encoder, for empty text embedding. tokenizer (`CLIPTokenizer`): CLIP tokenizer. scale_invariant (`bool`, *optional*): A model property specifying whether the predicted depth maps are scale-invariant. This value must be set in the model config. When used together with the `shift_invariant=True` flag, the model is also called "affine-invariant". NB: overriding this value is not supported. shift_invariant (`bool`, *optional*): A model property specifying whether the predicted depth maps are shift-invariant. This value must be set in the model config. When used together with the `scale_invariant=True` flag, the model is also called "affine-invariant". NB: overriding this value is not supported. default_denoising_steps (`int`, *optional*): The minimum number of denoising diffusion steps that are required to produce a prediction of reasonable quality with the given model. This value must be set in the model config. When the pipeline is called without explicitly setting `num_inference_steps`, the default value is used. This is required to ensure reasonable results with various model flavors compatible with the pipeline, such as those relying on very short denoising schedules (`LCMScheduler`) and those with full diffusion schedules (`DDIMScheduler`). default_processing_resolution (`int`, *optional*): The recommended value of the `processing_resolution` parameter of the pipeline. This value must be set in the model config. When the pipeline is called without explicitly setting `processing_resolution`, the default value is used. This is required to ensure reasonable results with various model flavors trained with varying optimal processing resolution values. """ rgb_latent_scale_factor = 0.18215 depth_latent_scale_factor = 0.18215 def __init__( self, unet: UNet2DConditionModel, vae: AutoencoderKL, text_encoder: CLIPTextModel, tokenizer: CLIPTokenizer, scale_invariant: Optional[bool] = True, shift_invariant: Optional[bool] = True, default_processing_resolution: Optional[int] = None, ): super().__init__() # unet = UNet2DConditionModel.from_pretrained('/zssd/szy/Marigold_rgb2d/ckpt/eval/unet') self.register_modules( unet=unet, vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, ) self.register_to_config( scale_invariant=scale_invariant, shift_invariant=shift_invariant, default_processing_resolution=default_processing_resolution, ) self.scale_invariant = scale_invariant self.shift_invariant = shift_invariant self.default_processing_resolution = default_processing_resolution self.empty_text_embed = None @torch.no_grad() def __call__( self, input_image: Union[Image.Image, torch.Tensor], processing_res: Optional[int] = None, match_input_res: bool = True, resample_method: str = "bilinear", batch_size: int = 0, color_map: str = "Spectral", show_progress_bar: bool = True, ) -> DepthMasterDepthOutput: """ Function invoked when calling the pipeline. Args: input_image (`Image`): Input RGB (or gray-scale) image. processing_res (`int`, *optional*, defaults to `None`): Effective processing resolution. When set to `0`, processes at the original image resolution. This produces crisper predictions, but may also lead to the overall loss of global context. The default value `None` resolves to the optimal value from the model config. match_input_res (`bool`, *optional*, defaults to `True`): Resize depth prediction to match input resolution. Only valid if `processing_res` > 0. resample_method: (`str`, *optional*, defaults to `bilinear`): Resampling method used to resize images and depth predictions. This can be one of `bilinear`, `bicubic` or `nearest`, defaults to: `bilinear`. batch_size (`int`, *optional*, defaults to `0`): Inference batch size, no bigger than `num_ensemble`. If set to 0, the script will automatically decide the proper batch size. show_progress_bar (`bool`, *optional*, defaults to `True`): Display a progress bar of diffusion denoising. color_map (`str`, *optional*, defaults to `"Spectral"`, pass `None` to skip colorized depth map generation): Colormap used to colorize the depth map. Returns: `DepthMasterDepthOutput`: Output class for DepthMaster monocular depth prediction pipeline, including: - **depth_np** (`np.ndarray`) Predicted depth map, with depth values in the range of [0, 1] - **depth_colored** (`PIL.Image.Image`) Colorized depth map, with the shape of [3, H, W] and values in [0, 1], None if `color_map` is `None` """ # Model-specific optimal default values leading to fast and reasonable results. if processing_res is None: processing_res = self.default_processing_resolution assert processing_res >= 0 resample_method: InterpolationMode = get_tv_resample_method(resample_method) # ----------------- Image Preprocess ----------------- # Convert to torch tensor if isinstance(input_image, Image.Image): input_image = input_image.convert("RGB") # convert to torch tensor [H, W, rgb] -> [rgb, H, W] rgb = pil_to_tensor(input_image) rgb = rgb.unsqueeze(0) # [1, rgb, H, W] elif isinstance(input_image, torch.Tensor): rgb = input_image else: raise TypeError(f"Unknown input type: {type(input_image) = }") input_size = rgb.shape assert ( 4 == rgb.dim() and 3 == input_size[-3] ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" # --------------- Image Processing ------------------------ # Resize image if processing_res > 0: rgb = resize_max_res( rgb, max_edge_resolution=processing_res, resample_method=resample_method, ) # Normalize rgb values rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 # [0, 255] -> [-1, 1] rgb_norm = rgb_norm.to(self.dtype) assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 # ----------------- Predicting depth ----------------- # Batch repeated input image duplicated_rgb = rgb_norm.expand(1, -1, -1, -1) single_rgb_dataset = TensorDataset(duplicated_rgb) # find the batch size if batch_size > 0: _bs = batch_size else: _bs = 1 single_rgb_loader = DataLoader( single_rgb_dataset, batch_size=_bs, shuffle=False ) # Predict depth maps (batched) depth_pred_ls = [] if show_progress_bar: iterable = tqdm( single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False ) else: iterable = single_rgb_loader for batch in iterable: (batched_img,) = batch # here the image is still around 0-1 depth_pred_raw = self.single_infer( rgb_in=batched_img, ) depth_pred_ls.append(depth_pred_raw.detach()) depth_preds = torch.concat(depth_pred_ls, dim=0) torch.cuda.empty_cache() # clear vram cache for ensembling depth_pred = depth_preds pred_uncert = None # Resize back to original resolution if match_input_res: depth_pred = resize( depth_pred, input_size[-2:], interpolation=resample_method, antialias=True, ) # Convert to numpy depth_pred = depth_pred.squeeze() depth_pred = depth_pred.cpu().numpy() if pred_uncert is not None: pred_uncert = pred_uncert.squeeze().cpu().numpy() # Clip output range depth_pred = depth_pred.clip(0, 1) # Colorize if color_map is not None: depth_colored = colorize_depth_maps( depth_pred, 0, 1, cmap=color_map ).squeeze() # [3, H, W], value in (0, 1) depth_colored = (depth_colored * 255).astype(np.uint8) depth_colored_hwc = chw2hwc(depth_colored) depth_colored_img = Image.fromarray(depth_colored_hwc) else: depth_colored_img = None return DepthMasterDepthOutput( depth_np=depth_pred, depth_colored=depth_colored_img, uncertainty=pred_uncert, ) def encode_empty_text(self): """ Encode text embedding for empty prompt """ prompt = "" text_inputs = self.tokenizer( prompt, padding="do_not_pad", max_length=self.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) #[1,2] self.empty_text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) #[1,2,1024] @torch.no_grad() def single_infer( self, rgb_in: torch.Tensor, ) -> torch.Tensor: """ Perform an individual depth prediction without ensembling. Args: rgb_in (`torch.Tensor`): Input RGB image. Returns: `torch.Tensor`: Predicted depth map. """ device = self.device rgb_in = rgb_in.to(device) # Encode image rgb_latent = self.encode_rgb(rgb_in) # 1/8 Resolution with a channel nums of 4. # Batched empty text embedding if self.empty_text_embed is None: self.encode_empty_text() batch_empty_text_embed = self.empty_text_embed.repeat( (rgb_latent.shape[0], 1, 1) ).to(device) # [B, 2, 1024] unet_output = self.unet( rgb_latent, 1, encoder_hidden_states=batch_empty_text_embed, ).sample # [B, 4, h, w] torch.cuda.empty_cache() depth = self.decode_depth(unet_output) # [B, 1, h, w] # clip prediction depth = torch.clip(depth, -1.0, 1.0) # shift to [0, 1] depth = (depth + 1.0) / 2.0 return depth def encode_rgb(self, rgb_in: torch.Tensor) -> torch.Tensor: """ Encode RGB image into latent. Args: rgb_in (`torch.Tensor`): Input RGB image to be encoded. Returns: `torch.Tensor`: Image latent. """ # encode h = self.vae.encoder(rgb_in) moments = self.vae.quant_conv(h) mean, logvar = torch.chunk(moments, 2, dim=1) # scale latent rgb_latent = mean * self.rgb_latent_scale_factor return rgb_latent def decode_depth(self, depth_latent: torch.Tensor) -> torch.Tensor: """ Decode depth latent into depth map. Args: depth_latent (`torch.Tensor`): Depth latent to be decoded. Returns: `torch.Tensor`: Decoded depth map. """ # scale latent depth_latent = depth_latent / self.depth_latent_scale_factor # decode z = self.vae.post_quant_conv(depth_latent) stacked = self.vae.decoder(z) # mean of output channels depth_mean = stacked.mean(dim=1, keepdim=True) return depth_mean