# temporary minimum implementation of LoRA # FLUX doesn't have Conv2d, so we ignore it # TODO commonize with the original implementation # LoRA network module # reference: # https://github.com/microsoft/LoRA/blob/main/loralib/layers.py # https://github.com/cloneofsimo/lora/blob/master/lora_diffusion/lora.py import math import os from typing import Dict, List, Optional, Tuple, Type, Union from diffusers import AutoencoderKL from transformers import CLIPTextModel import numpy as np import torch import re from library.utils import setup_logging from library.sdxl_original_unet import SdxlUNet2DConditionModel setup_logging() import logging logger = logging.getLogger(__name__) class LoRAModule(torch.nn.Module): """ replaces forward method of the original Linear, instead of replacing the original Linear module. """ def __init__( self, lora_name, org_module: torch.nn.Module, multiplier=1.0, lora_dim=4, alpha=1, dropout=None, rank_dropout=None, module_dropout=None, ): """if alpha == 0 or None, alpha is rank (no scaling).""" super().__init__() self.lora_name = lora_name if org_module.__class__.__name__ == "Conv2d": in_dim = org_module.in_channels out_dim = org_module.out_channels else: in_dim = org_module.in_features out_dim = org_module.out_features self.lora_dim = lora_dim if org_module.__class__.__name__ == "Conv2d": kernel_size = org_module.kernel_size stride = org_module.stride padding = org_module.padding self.lora_down = torch.nn.Conv2d(in_dim, self.lora_dim, kernel_size, stride, padding, bias=False) self.lora_up = torch.nn.Conv2d(self.lora_dim, out_dim, (1, 1), (1, 1), bias=False) else: self.lora_down = torch.nn.Linear(in_dim, self.lora_dim, bias=False) self.lora_up = torch.nn.Linear(self.lora_dim, out_dim, bias=False) if type(alpha) == torch.Tensor: alpha = alpha.detach().float().numpy() # without casting, bf16 causes error alpha = self.lora_dim if alpha is None or alpha == 0 else alpha self.scale = alpha / self.lora_dim self.register_buffer("alpha", torch.tensor(alpha)) # 定数として扱える # same as microsoft's torch.nn.init.kaiming_uniform_(self.lora_down.weight, a=math.sqrt(5)) torch.nn.init.zeros_(self.lora_up.weight) self.multiplier = multiplier self.org_module = org_module # remove in applying self.dropout = dropout self.rank_dropout = rank_dropout self.module_dropout = module_dropout def apply_to(self): self.org_forward = self.org_module.forward self.org_module.forward = self.forward del self.org_module def forward(self, x): org_forwarded = self.org_forward(x) # module dropout if self.module_dropout is not None and self.training: if torch.rand(1) < self.module_dropout: return org_forwarded lx = self.lora_down(x) # normal dropout if self.dropout is not None and self.training: lx = torch.nn.functional.dropout(lx, p=self.dropout) # rank dropout if self.rank_dropout is not None and self.training: mask = torch.rand((lx.size(0), self.lora_dim), device=lx.device) > self.rank_dropout if len(lx.size()) == 3: mask = mask.unsqueeze(1) # for Text Encoder elif len(lx.size()) == 4: mask = mask.unsqueeze(-1).unsqueeze(-1) # for Conv2d lx = lx * mask # scaling for rank dropout: treat as if the rank is changed # maskから計算することも考えられるが、augmentation的な効果を期待してrank_dropoutを用いる scale = self.scale * (1.0 / (1.0 - self.rank_dropout)) # redundant for readability else: scale = self.scale lx = self.lora_up(lx) return org_forwarded + lx * self.multiplier * scale class LoRAInfModule(LoRAModule): def __init__( self, lora_name, org_module: torch.nn.Module, multiplier=1.0, lora_dim=4, alpha=1, **kwargs, ): # no dropout for inference super().__init__(lora_name, org_module, multiplier, lora_dim, alpha) self.org_module_ref = [org_module] # 後から参照できるように self.enabled = True self.network: LoRANetwork = None def set_network(self, network): self.network = network # freezeしてマージする def merge_to(self, sd, dtype, device): # extract weight from org_module org_sd = self.org_module.state_dict() weight = org_sd["weight"] org_dtype = weight.dtype org_device = weight.device weight = weight.to(torch.float) # calc in float if dtype is None: dtype = org_dtype if device is None: device = org_device # get up/down weight up_weight = sd["lora_up.weight"].to(torch.float).to(device) down_weight = sd["lora_down.weight"].to(torch.float).to(device) # merge weight if len(weight.size()) == 2: # linear weight = weight + self.multiplier * (up_weight @ down_weight) * self.scale elif down_weight.size()[2:4] == (1, 1): # conv2d 1x1 weight = ( weight + self.multiplier * (up_weight.squeeze(3).squeeze(2) @ down_weight.squeeze(3).squeeze(2)).unsqueeze(2).unsqueeze(3) * self.scale ) else: # conv2d 3x3 conved = torch.nn.functional.conv2d(down_weight.permute(1, 0, 2, 3), up_weight).permute(1, 0, 2, 3) # logger.info(conved.size(), weight.size(), module.stride, module.padding) weight = weight + self.multiplier * conved * self.scale # set weight to org_module org_sd["weight"] = weight.to(dtype) self.org_module.load_state_dict(org_sd) # 復元できるマージのため、このモジュールのweightを返す def get_weight(self, multiplier=None): if multiplier is None: multiplier = self.multiplier # get up/down weight from module up_weight = self.lora_up.weight.to(torch.float) down_weight = self.lora_down.weight.to(torch.float) # pre-calculated weight if len(down_weight.size()) == 2: # linear weight = self.multiplier * (up_weight @ down_weight) * self.scale elif down_weight.size()[2:4] == (1, 1): # conv2d 1x1 weight = ( self.multiplier * (up_weight.squeeze(3).squeeze(2) @ down_weight.squeeze(3).squeeze(2)).unsqueeze(2).unsqueeze(3) * self.scale ) else: # conv2d 3x3 conved = torch.nn.functional.conv2d(down_weight.permute(1, 0, 2, 3), up_weight).permute(1, 0, 2, 3) weight = self.multiplier * conved * self.scale return weight def set_region(self, region): self.region = region self.region_mask = None def default_forward(self, x): # logger.info(f"default_forward {self.lora_name} {x.size()}") return self.org_forward(x) + self.lora_up(self.lora_down(x)) * self.multiplier * self.scale def forward(self, x): if not self.enabled: return self.org_forward(x) return self.default_forward(x) def create_network( multiplier: float, network_dim: Optional[int], network_alpha: Optional[float], ae: AutoencoderKL, text_encoders: List[CLIPTextModel], flux, neuron_dropout: Optional[float] = None, **kwargs, ): if network_dim is None: network_dim = 4 # default if network_alpha is None: network_alpha = 1.0 # extract dim/alpha for conv2d, and block dim conv_dim = kwargs.get("conv_dim", None) conv_alpha = kwargs.get("conv_alpha", None) if conv_dim is not None: conv_dim = int(conv_dim) if conv_alpha is None: conv_alpha = 1.0 else: conv_alpha = float(conv_alpha) # rank/module dropout rank_dropout = kwargs.get("rank_dropout", None) if rank_dropout is not None: rank_dropout = float(rank_dropout) module_dropout = kwargs.get("module_dropout", None) if module_dropout is not None: module_dropout = float(module_dropout) # single or double blocks train_blocks = kwargs.get("train_blocks", None) # None (default), "all" (same as None), "single", "double" if train_blocks is not None: assert train_blocks in ["all", "single", "double"], f"invalid train_blocks: {train_blocks}" # すごく引数が多いな ( ^ω^)・・・ network = LoRANetwork( text_encoders, flux, multiplier=multiplier, lora_dim=network_dim, alpha=network_alpha, dropout=neuron_dropout, rank_dropout=rank_dropout, module_dropout=module_dropout, conv_lora_dim=conv_dim, conv_alpha=conv_alpha, train_blocks=train_blocks, varbose=True, ) loraplus_lr_ratio = kwargs.get("loraplus_lr_ratio", None) loraplus_unet_lr_ratio = kwargs.get("loraplus_unet_lr_ratio", None) loraplus_text_encoder_lr_ratio = kwargs.get("loraplus_text_encoder_lr_ratio", None) loraplus_lr_ratio = float(loraplus_lr_ratio) if loraplus_lr_ratio is not None else None loraplus_unet_lr_ratio = float(loraplus_unet_lr_ratio) if loraplus_unet_lr_ratio is not None else None loraplus_text_encoder_lr_ratio = float(loraplus_text_encoder_lr_ratio) if loraplus_text_encoder_lr_ratio is not None else None if loraplus_lr_ratio is not None or loraplus_unet_lr_ratio is not None or loraplus_text_encoder_lr_ratio is not None: network.set_loraplus_lr_ratio(loraplus_lr_ratio, loraplus_unet_lr_ratio, loraplus_text_encoder_lr_ratio) return network # Create network from weights for inference, weights are not loaded here (because can be merged) def create_network_from_weights(multiplier, file, ae, text_encoders, flux, weights_sd=None, for_inference=False, **kwargs): # if unet is an instance of SdxlUNet2DConditionModel or subclass, set is_sdxl to True if weights_sd is None: if os.path.splitext(file)[1] == ".safetensors": from safetensors.torch import load_file, safe_open weights_sd = load_file(file) else: weights_sd = torch.load(file, map_location="cpu") # get dim/alpha mapping modules_dim = {} modules_alpha = {} for key, value in weights_sd.items(): if "." not in key: continue lora_name = key.split(".")[0] if "alpha" in key: modules_alpha[lora_name] = value elif "lora_down" in key: dim = value.size()[0] modules_dim[lora_name] = dim # logger.info(lora_name, value.size(), dim) module_class = LoRAInfModule if for_inference else LoRAModule network = LoRANetwork( text_encoders, flux, multiplier=multiplier, modules_dim=modules_dim, modules_alpha=modules_alpha, module_class=module_class ) return network, weights_sd class LoRANetwork(torch.nn.Module): # FLUX_TARGET_REPLACE_MODULE = ["DoubleStreamBlock", "SingleStreamBlock"] FLUX_TARGET_REPLACE_MODULE_DOUBLE = ["DoubleStreamBlock"] FLUX_TARGET_REPLACE_MODULE_SINGLE = ["SingleStreamBlock"] TEXT_ENCODER_TARGET_REPLACE_MODULE = ["CLIPAttention", "CLIPMLP"] LORA_PREFIX_FLUX = "lora_unet" # make ComfyUI compatible LORA_PREFIX_TEXT_ENCODER_CLIP = "lora_te1" LORA_PREFIX_TEXT_ENCODER_T5 = "lora_te2" def __init__( self, text_encoders: Union[List[CLIPTextModel], CLIPTextModel], unet, multiplier: float = 1.0, lora_dim: int = 4, alpha: float = 1, dropout: Optional[float] = None, rank_dropout: Optional[float] = None, module_dropout: Optional[float] = None, conv_lora_dim: Optional[int] = None, conv_alpha: Optional[float] = None, module_class: Type[object] = LoRAModule, modules_dim: Optional[Dict[str, int]] = None, modules_alpha: Optional[Dict[str, int]] = None, train_blocks: Optional[str] = None, varbose: Optional[bool] = False, ) -> None: super().__init__() self.multiplier = multiplier self.lora_dim = lora_dim self.alpha = alpha self.conv_lora_dim = conv_lora_dim self.conv_alpha = conv_alpha self.dropout = dropout self.rank_dropout = rank_dropout self.module_dropout = module_dropout self.train_blocks = train_blocks if train_blocks is not None else "all" self.loraplus_lr_ratio = None self.loraplus_unet_lr_ratio = None self.loraplus_text_encoder_lr_ratio = None if modules_dim is not None: logger.info(f"create LoRA network from weights") else: logger.info(f"create LoRA network. base dim (rank): {lora_dim}, alpha: {alpha}") logger.info( f"neuron dropout: p={self.dropout}, rank dropout: p={self.rank_dropout}, module dropout: p={self.module_dropout}" ) if self.conv_lora_dim is not None: logger.info( f"apply LoRA to Conv2d with kernel size (3,3). dim (rank): {self.conv_lora_dim}, alpha: {self.conv_alpha}" ) # create module instances def create_modules( is_flux: bool, text_encoder_idx: Optional[int], root_module: torch.nn.Module, target_replace_modules: List[str] ) -> List[LoRAModule]: prefix = ( self.LORA_PREFIX_FLUX if is_flux else (self.LORA_PREFIX_TEXT_ENCODER_CLIP if text_encoder_idx == 0 else self.LORA_PREFIX_TEXT_ENCODER_T5) ) loras = [] skipped = [] for name, module in root_module.named_modules(): if module.__class__.__name__ in target_replace_modules: for child_name, child_module in module.named_modules(): is_linear = child_module.__class__.__name__ == "Linear" is_conv2d = child_module.__class__.__name__ == "Conv2d" is_conv2d_1x1 = is_conv2d and child_module.kernel_size == (1, 1) if is_linear or is_conv2d: lora_name = prefix + "." + name + "." + child_name lora_name = lora_name.replace(".", "_") dim = None alpha = None if modules_dim is not None: # モジュール指定あり if lora_name in modules_dim: dim = modules_dim[lora_name] alpha = modules_alpha[lora_name] else: # 通常、すべて対象とする if is_linear or is_conv2d_1x1: dim = self.lora_dim alpha = self.alpha elif self.conv_lora_dim is not None: dim = self.conv_lora_dim alpha = self.conv_alpha if dim is None or dim == 0: # skipした情報を出力 if is_linear or is_conv2d_1x1 or (self.conv_lora_dim is not None): skipped.append(lora_name) continue lora = module_class( lora_name, child_module, self.multiplier, dim, alpha, dropout=dropout, rank_dropout=rank_dropout, module_dropout=module_dropout, ) loras.append(lora) return loras, skipped # create LoRA for text encoder # 毎回すべてのモジュールを作るのは無駄なので要検討 self.text_encoder_loras: List[Union[LoRAModule, LoRAInfModule]] = [] skipped_te = [] for i, text_encoder in enumerate(text_encoders): index = i logger.info(f"create LoRA for Text Encoder {index+1}:") text_encoder_loras, skipped = create_modules(False, index, text_encoder, LoRANetwork.TEXT_ENCODER_TARGET_REPLACE_MODULE) self.text_encoder_loras.extend(text_encoder_loras) skipped_te += skipped logger.info(f"create LoRA for Text Encoder: {len(self.text_encoder_loras)} modules.") # create LoRA for U-Net if self.train_blocks == "all": target_replace_modules = LoRANetwork.FLUX_TARGET_REPLACE_MODULE_DOUBLE + LoRANetwork.FLUX_TARGET_REPLACE_MODULE_SINGLE elif self.train_blocks == "single": target_replace_modules = LoRANetwork.FLUX_TARGET_REPLACE_MODULE_SINGLE elif self.train_blocks == "double": target_replace_modules = LoRANetwork.FLUX_TARGET_REPLACE_MODULE_DOUBLE self.unet_loras: List[Union[LoRAModule, LoRAInfModule]] self.unet_loras, skipped_un = create_modules(True, None, unet, target_replace_modules) logger.info(f"create LoRA for FLUX {self.train_blocks} blocks: {len(self.unet_loras)} modules.") skipped = skipped_te + skipped_un if varbose and len(skipped) > 0: logger.warning( f"because dim (rank) is 0, {len(skipped)} LoRA modules are skipped / dim (rank)が0の為、次の{len(skipped)}個のLoRAモジュールはスキップされます:" ) for name in skipped: logger.info(f"\t{name}") # assertion names = set() for lora in self.text_encoder_loras + self.unet_loras: assert lora.lora_name not in names, f"duplicated lora name: {lora.lora_name}" names.add(lora.lora_name) def set_multiplier(self, multiplier): self.multiplier = multiplier for lora in self.text_encoder_loras + self.unet_loras: lora.multiplier = self.multiplier def set_enabled(self, is_enabled): for lora in self.text_encoder_loras + self.unet_loras: lora.enabled = is_enabled def load_weights(self, file): if os.path.splitext(file)[1] == ".safetensors": from safetensors.torch import load_file weights_sd = load_file(file) else: weights_sd = torch.load(file, map_location="cpu") info = self.load_state_dict(weights_sd, False) return info def apply_to(self, text_encoders, flux, apply_text_encoder=True, apply_unet=True): if apply_text_encoder: logger.info(f"enable LoRA for text encoder: {len(self.text_encoder_loras)} modules") else: self.text_encoder_loras = [] if apply_unet: logger.info(f"enable LoRA for U-Net: {len(self.unet_loras)} modules") else: self.unet_loras = [] for lora in self.text_encoder_loras + self.unet_loras: lora.apply_to() self.add_module(lora.lora_name, lora) # マージできるかどうかを返す def is_mergeable(self): return True # TODO refactor to common function with apply_to def merge_to(self, text_encoders, flux, weights_sd, dtype=None, device=None): apply_text_encoder = apply_unet = False for key in weights_sd.keys(): if key.startswith(LoRANetwork.LORA_PREFIX_TEXT_ENCODER_CLIP) or key.startswith(LoRANetwork.LORA_PREFIX_TEXT_ENCODER_T5): apply_text_encoder = True elif key.startswith(LoRANetwork.LORA_PREFIX_FLUX): apply_unet = True if apply_text_encoder: logger.info("enable LoRA for text encoder") else: self.text_encoder_loras = [] if apply_unet: logger.info("enable LoRA for U-Net") else: self.unet_loras = [] for lora in self.text_encoder_loras + self.unet_loras: sd_for_lora = {} for key in weights_sd.keys(): if key.startswith(lora.lora_name): sd_for_lora[key[len(lora.lora_name) + 1 :]] = weights_sd[key] lora.merge_to(sd_for_lora, dtype, device) logger.info(f"weights are merged") def set_loraplus_lr_ratio(self, loraplus_lr_ratio, loraplus_unet_lr_ratio, loraplus_text_encoder_lr_ratio): self.loraplus_lr_ratio = loraplus_lr_ratio self.loraplus_unet_lr_ratio = loraplus_unet_lr_ratio self.loraplus_text_encoder_lr_ratio = loraplus_text_encoder_lr_ratio logger.info(f"LoRA+ UNet LR Ratio: {self.loraplus_unet_lr_ratio or self.loraplus_lr_ratio}") logger.info(f"LoRA+ Text Encoder LR Ratio: {self.loraplus_text_encoder_lr_ratio or self.loraplus_lr_ratio}") # 二つのText Encoderに別々の学習率を設定できるようにするといいかも def prepare_optimizer_params(self, text_encoder_lr, unet_lr, default_lr): # TODO warn if optimizer is not compatible with LoRA+ (but it will cause error so we don't need to check it here?) # if ( # self.loraplus_lr_ratio is not None # or self.loraplus_text_encoder_lr_ratio is not None # or self.loraplus_unet_lr_ratio is not None # ): # assert ( # optimizer_type.lower() != "prodigy" and "dadapt" not in optimizer_type.lower() # ), "LoRA+ and Prodigy/DAdaptation is not supported / LoRA+とProdigy/DAdaptationの組み合わせはサポートされていません" self.requires_grad_(True) all_params = [] lr_descriptions = [] def assemble_params(loras, lr, ratio): param_groups = {"lora": {}, "plus": {}} for lora in loras: for name, param in lora.named_parameters(): if ratio is not None and "lora_up" in name: param_groups["plus"][f"{lora.lora_name}.{name}"] = param else: param_groups["lora"][f"{lora.lora_name}.{name}"] = param params = [] descriptions = [] for key in param_groups.keys(): param_data = {"params": param_groups[key].values()} if len(param_data["params"]) == 0: continue if lr is not None: if key == "plus": param_data["lr"] = lr * ratio else: param_data["lr"] = lr if param_data.get("lr", None) == 0 or param_data.get("lr", None) is None: logger.info("NO LR skipping!") continue params.append(param_data) descriptions.append("plus" if key == "plus" else "") return params, descriptions if self.text_encoder_loras: params, descriptions = assemble_params( self.text_encoder_loras, text_encoder_lr if text_encoder_lr is not None else default_lr, self.loraplus_text_encoder_lr_ratio or self.loraplus_lr_ratio, ) all_params.extend(params) lr_descriptions.extend(["textencoder" + (" " + d if d else "") for d in descriptions]) if self.unet_loras: # if self.block_lr: # is_sdxl = False # for lora in self.unet_loras: # if "input_blocks" in lora.lora_name or "output_blocks" in lora.lora_name: # is_sdxl = True # break # # 学習率のグラフをblockごとにしたいので、blockごとにloraを分類 # block_idx_to_lora = {} # for lora in self.unet_loras: # idx = get_block_index(lora.lora_name, is_sdxl) # if idx not in block_idx_to_lora: # block_idx_to_lora[idx] = [] # block_idx_to_lora[idx].append(lora) # # blockごとにパラメータを設定する # for idx, block_loras in block_idx_to_lora.items(): # params, descriptions = assemble_params( # block_loras, # (unet_lr if unet_lr is not None else default_lr) * self.get_lr_weight(idx), # self.loraplus_unet_lr_ratio or self.loraplus_lr_ratio, # ) # all_params.extend(params) # lr_descriptions.extend([f"unet_block{idx}" + (" " + d if d else "") for d in descriptions]) # else: params, descriptions = assemble_params( self.unet_loras, unet_lr if unet_lr is not None else default_lr, self.loraplus_unet_lr_ratio or self.loraplus_lr_ratio, ) all_params.extend(params) lr_descriptions.extend(["unet" + (" " + d if d else "") for d in descriptions]) return all_params, lr_descriptions def enable_gradient_checkpointing(self): # not supported pass def prepare_grad_etc(self, text_encoder, unet): self.requires_grad_(True) def on_epoch_start(self, text_encoder, unet): self.train() def get_trainable_params(self): return self.parameters() def save_weights(self, file, dtype, metadata): if metadata is not None and len(metadata) == 0: metadata = None state_dict = self.state_dict() if dtype is not None: for key in list(state_dict.keys()): v = state_dict[key] v = v.detach().clone().to("cpu").to(dtype) state_dict[key] = v if os.path.splitext(file)[1] == ".safetensors": from safetensors.torch import save_file from library import train_util # Precalculate model hashes to save time on indexing if metadata is None: metadata = {} model_hash, legacy_hash = train_util.precalculate_safetensors_hashes(state_dict, metadata) metadata["sshs_model_hash"] = model_hash metadata["sshs_legacy_hash"] = legacy_hash save_file(state_dict, file, metadata) else: torch.save(state_dict, file) def backup_weights(self): # 重みのバックアップを行う loras: List[LoRAInfModule] = self.text_encoder_loras + self.unet_loras for lora in loras: org_module = lora.org_module_ref[0] if not hasattr(org_module, "_lora_org_weight"): sd = org_module.state_dict() org_module._lora_org_weight = sd["weight"].detach().clone() org_module._lora_restored = True def restore_weights(self): # 重みのリストアを行う loras: List[LoRAInfModule] = self.text_encoder_loras + self.unet_loras for lora in loras: org_module = lora.org_module_ref[0] if not org_module._lora_restored: sd = org_module.state_dict() sd["weight"] = org_module._lora_org_weight org_module.load_state_dict(sd) org_module._lora_restored = True def pre_calculation(self): # 事前計算を行う loras: List[LoRAInfModule] = self.text_encoder_loras + self.unet_loras for lora in loras: org_module = lora.org_module_ref[0] sd = org_module.state_dict() org_weight = sd["weight"] lora_weight = lora.get_weight().to(org_weight.device, dtype=org_weight.dtype) sd["weight"] = org_weight + lora_weight assert sd["weight"].shape == org_weight.shape org_module.load_state_dict(sd) org_module._lora_restored = False lora.enabled = False def apply_max_norm_regularization(self, max_norm_value, device): downkeys = [] upkeys = [] alphakeys = [] norms = [] keys_scaled = 0 state_dict = self.state_dict() for key in state_dict.keys(): if "lora_down" in key and "weight" in key: downkeys.append(key) upkeys.append(key.replace("lora_down", "lora_up")) alphakeys.append(key.replace("lora_down.weight", "alpha")) for i in range(len(downkeys)): down = state_dict[downkeys[i]].to(device) up = state_dict[upkeys[i]].to(device) alpha = state_dict[alphakeys[i]].to(device) dim = down.shape[0] scale = alpha / dim if up.shape[2:] == (1, 1) and down.shape[2:] == (1, 1): updown = (up.squeeze(2).squeeze(2) @ down.squeeze(2).squeeze(2)).unsqueeze(2).unsqueeze(3) elif up.shape[2:] == (3, 3) or down.shape[2:] == (3, 3): updown = torch.nn.functional.conv2d(down.permute(1, 0, 2, 3), up).permute(1, 0, 2, 3) else: updown = up @ down updown *= scale norm = updown.norm().clamp(min=max_norm_value / 2) desired = torch.clamp(norm, max=max_norm_value) ratio = desired.cpu() / norm.cpu() sqrt_ratio = ratio**0.5 if ratio != 1: keys_scaled += 1 state_dict[upkeys[i]] *= sqrt_ratio state_dict[downkeys[i]] *= sqrt_ratio scalednorm = updown.norm() * ratio norms.append(scalednorm.item()) return keys_scaled, sum(norms) / len(norms), max(norms)