Source code for enc.training.loss

# Software Name: Cool-Chic
# SPDX-FileCopyrightText: Copyright (c) 2023-2025 Orange
# SPDX-License-Identifier: BSD 3-Clause "New"
#
# This software is distributed under the BSD-3-Clause license.
#
# Authors: see CONTRIBUTORS.md


import typing
from dataclasses import dataclass, field
from typing import Dict, Literal, Optional, Union

import torch
from enc.io.format.yuv import DictTensorYUV
from enc.training.metrics.mse import dist_to_db, mse_fn
from enc.training.metrics.wasserstein import wasserstein_fn
from torch import Tensor

DISTORTION_METRIC = Literal["mse", "wasserstein"]

[docs] @dataclass(kw_only=True) class LossFunctionOutput(): """Output for FrameEncoder.loss_function""" # ----- This is the important output # Optional to allow easy inheritance by FrameEncoderLogs # but will never be None loss: Optional[float] = None # The RD cost to optimize dist: Optional[float] = None # The distorsion cost to optimize along with the rate rate_bpp:Optional[float] = None # Any other data required to compute some logs, stored inside a dictionary detailed_dist: Optional[Dict[DISTORTION_METRIC, float]] = None # Each distortion value (mse, wasserstein...) rate_latent_bpp: Optional[float] = None # Rate associated to the latent [bpp] total_rate_nn_bpp : float = 0. # Total rate associated to the all NNs of all cool-chic [bpp] # ==================== Not set by the init function ===================== # # Everything here is derived from the above metrics total_rate_latent_bpp: Optional[float] = field(init=False, default=None) # Overall rate of all the latents [bpp] dist_db: Optional[float] = None detailed_dist_db: Optional[Dict[DISTORTION_METRIC, float]] = field( init=False, default_factory=lambda: {} ) # Each distortion value (mse, wasserstein...) in dB total_rate_bpp: Optional[float] = field(init=False, default=None) # Overall rate: latent & NNs [bpp] # ==================== Not set by the init function ===================== # def __post_init__(self): # Compute some dB values from distortion if self.detailed_dist is not None: self.detailed_dist_db["psnr_db"] = dist_to_db(self.detailed_dist["mse"]) if "wasserstein" in self.detailed_dist: self.detailed_dist_db["wd_db"] = dist_to_db(self.detailed_dist["wasserstein"]) self.dist_db = dist_to_db(self.dist) if self.rate_latent_bpp is not None: self.total_rate_latent_bpp = sum(self.rate_latent_bpp.values()) else: self.total_rate_latent_bpp = 0 self.total_rate_bpp = self.total_rate_latent_bpp + self.total_rate_nn_bpp
def _compute_mse( x: Union[Tensor, DictTensorYUV], y: Union[Tensor, DictTensorYUV] ) -> Tensor: """Compute the Mean Squared Error between two images. Both images can either be a single tensor, or a dictionary of tensors with one for each color channel. In case of images with multiple channels, the final MSE is obtained by averaging the MSE for each color channel, weighted by the number of pixels. E.g. for YUV 420: MSE = (4 * MSE_Y + MSE_U + MSE_V) / 6 Args: x (Union[Tensor, DictTensorYUV]): One of the two inputs y (Union[Tensor, DictTensorYUV]): The other input Returns: Tensor: One element tensor containing the MSE of x and y. """ flag_420 = not (isinstance(x, Tensor)) if not flag_420: return mse_fn(x, y) else: # Total number of pixels for all channels total_pixels_yuv = 0.0 # MSE weighted by the number of pixels in each channels mse = torch.zeros((1), device=x.get("y").device) for (_, x_channel), (_, y_channel) in zip(x.items(), y.items()): n_pixels_channel = x_channel.numel() mse = mse + mse_fn(x_channel, y_channel) * n_pixels_channel total_pixels_yuv += n_pixels_channel mse = mse / total_pixels_yuv return mse def _compute_wasserstein( decoded_img: Union[Tensor, DictTensorYUV], target_img: Union[Tensor, DictTensorYUV] ) -> Tensor: """Compute the Wasserstein distance between two images. Both images can either be a single tensor, or a dictionary of tensors with one for each color channel. In case of images with multiple channels, the final Wasserstein distance is obtained by averaging the Wasserstein distance for each color channel, weighted by the number of pixels. E.g. for YUV 420: WD = (4 * WD_Y + WD_U + WD_V) / 6 Args: x (Union[Tensor, DictTensorYUV]): One of the two inputs y (Union[Tensor, DictTensorYUV]): The other input Returns: Tensor: One element tensor containing the WD of x and y. """ flag_420 = not (isinstance(decoded_img, Tensor)) if not flag_420: wd = wasserstein_fn(decoded_img, target_img) else: # Total number of pixels for all channels total_pixels_yuv = 0.0 # WD weighted by the number of pixels in each channels wd = torch.zeros((1), device=decoded_img.get("y").device) for (_, decoded_channel), (_, target_channel) in zip( decoded_img.items(), target_img.items() ): n_pixels_channel = decoded_channel.numel() wd = wd + wasserstein_fn(decoded_channel, target_channel) * n_pixels_channel total_pixels_yuv += n_pixels_channel wd = wd / total_pixels_yuv return wd
[docs] def loss_function( decoded_image: Union[Tensor, DictTensorYUV], rate_latent_bit: Dict[str, Tensor], target_image: Union[Tensor, DictTensorYUV], dist_weight: Dict[DISTORTION_METRIC, float], lmbda: float = 1e-3, total_rate_nn_bit: float = 0.0, compute_logs: bool = False, ) -> LossFunctionOutput: """Compute the loss and a few other quantities. The loss equation is: .. math:: \\mathcal{L} = \\mathrm{D}(\hat{\\mathbf{x}}, \\mathbf{x}) + \\lambda (\\mathrm{R}(\hat{\\mathbf{x}}) + \\mathrm{R}_{NN}), \\text{ with } \\begin{cases} \\mathbf{x} & \\text{the original image}\\\\ \\hat{\\mathbf{x}} & \\text{the coded image}\\\\ \\mathrm{R}(\\hat{\\mathbf{x}}) & \\text{A measure of the rate of } \\hat{\\mathbf{x}} \\\\ \\mathrm{R}_{NN} & \\text{The rate of the neural networks}\\\\ \\mathrm{D}(\hat{\\mathbf{x}}, \\mathbf{x}) & \\text{A distortion metric specified by \\texttt{--tune} and \\texttt{--alpha}} \\end{cases} .. warning:: There is no back-propagation through the term :math:`\\mathrm{R}_{NN}`. It is just here to be taken into account by the rate-distortion cost so that it better reflects the compression performance. Args: decoded_image: The decoded image, either as a Tensor for RGB or YUV444 data, or as a dictionary of Tensors for YUV420 data. rate_latent_bit: Dictionary with the rate of each latent for each cool-chic decoder. Tensor with the rate of each latent value. The rate is in bit. target_image: The target image, either as a Tensor for RGB or YUV444 data, or as a dictionary of Tensors for YUV420 data. lmbda: Rate constraint. Defaults to 1e-3. total_rate_nn_bit: Total rate of the NNs (arm + upsampling + synthesis) for all each cool-chic encoder. Rate is in bit. Defaults to 0. compute_logs: True to output a few more quantities beside the loss. Defaults to False. Returns: Object gathering the different quantities computed by this loss function. Chief among them: the loss itself. """ if isinstance(target_image, Tensor): range_target = target_image.abs().max().item() if range_target > 1: target_min = target_image.min() target_max = target_image.max() decoded_image = (decoded_image - target_min) / (target_max - target_min) target_image = (target_image - target_min) / (target_max - target_min) flag_yuv420 = not isinstance(decoded_image, Tensor) device = decoded_image.get("y").device if flag_yuv420 else decoded_image.device all_dists = {} final_dist = torch.zeros((1), device=device) # Iterate on all possible distortion metrics. for dist_name, dist_w in dist_weight.items(): if dist_name == "mse": cur_dist = _compute_mse(decoded_image, target_image) elif dist_name == "wasserstein": cur_dist = _compute_wasserstein(decoded_image, target_image) else: raise ValueError( f"Unsupported distortion metrics. Found {dist_name}, available " f"values are {typing.get_args(DISTORTION_METRIC)}. Exiting!" ) all_dists[dist_name] = cur_dist # Aggregate weighted dist final_dist = final_dist + dist_w * cur_dist if flag_yuv420: n_pixels = decoded_image.get("y").size()[-2] * decoded_image.get("y").size()[-1] else: n_pixels = decoded_image.size()[-2] * decoded_image.size()[-1] total_rate_latent_bit = torch.cat( [v.sum().view(1) for _, v in rate_latent_bit.items()] ).sum() rate_bpp = total_rate_latent_bit + total_rate_nn_bit rate_bpp = rate_bpp / n_pixels loss = final_dist + lmbda * rate_bpp # Construct the output module, only the loss is always returned rate_latent_bpp = None total_rate_nn_bpp = 0.0 if compute_logs: rate_latent_bpp = { k: v.detach().sum().item() / n_pixels for k, v in rate_latent_bit.items() } total_rate_nn_bpp = total_rate_nn_bit / n_pixels # Detach all distortions only when computing logs for k, v in all_dists.items(): all_dists[k] = v.detach().item() output = LossFunctionOutput( loss=loss, dist=final_dist.detach().item(), rate_bpp=rate_bpp.detach().item(), detailed_dist=all_dists if compute_logs else None, total_rate_nn_bpp=total_rate_nn_bpp, rate_latent_bpp=rate_latent_bpp, ) return output