mirror of
https://github.com/kohya-ss/sd-scripts.git
synced 2026-04-09 06:45:09 +00:00
445 lines
16 KiB
Python
445 lines
16 KiB
Python
# LoRA network module
|
|
# reference:
|
|
# https://github.com/microsoft/LoRA/blob/main/loralib/layers.py
|
|
# https://github.com/cloneofsimo/lora/blob/master/lora_diffusion/lora.py
|
|
|
|
import math
|
|
import os
|
|
from typing import List
|
|
import torch
|
|
from diffusers import UNet2DConditionModel
|
|
|
|
from library import train_util
|
|
|
|
|
|
class ControlLoRAModule(torch.nn.Module):
|
|
"""
|
|
replaces forward method of the original Linear, instead of replacing the original Linear module.
|
|
"""
|
|
|
|
def __init__(self, lora_name, org_module: torch.nn.Module, multiplier=1.0, lora_dim=4, alpha=1):
|
|
""" if alpha == 0 or None, alpha is rank (no scaling). """
|
|
super().__init__()
|
|
self.lora_name = lora_name
|
|
self.lora_dim = lora_dim
|
|
|
|
if org_module.__class__.__name__ == 'Conv2d':
|
|
in_dim = org_module.in_channels
|
|
out_dim = org_module.out_channels
|
|
|
|
self.lora_dim = min(self.lora_dim, in_dim, out_dim)
|
|
if self.lora_dim != lora_dim:
|
|
print(f"{lora_name} dim (rank) is changed: {self.lora_dim}")
|
|
|
|
kernel_size = org_module.kernel_size
|
|
stride = org_module.stride
|
|
padding = org_module.padding
|
|
self.lora_down = torch.nn.Conv2d(in_dim, self.lora_dim, kernel_size, stride, padding, bias=False)
|
|
self.lora_up = torch.nn.Conv2d(self.lora_dim, out_dim, (1, 1), (1, 1), bias=False)
|
|
else:
|
|
in_dim = org_module.in_features
|
|
out_dim = org_module.out_features
|
|
self.lora_down = torch.nn.Linear(in_dim, self.lora_dim, bias=False)
|
|
self.lora_up = torch.nn.Linear(self.lora_dim, out_dim, bias=False)
|
|
|
|
if type(alpha) == torch.Tensor:
|
|
alpha = alpha.detach().float().numpy() # without casting, bf16 causes error
|
|
alpha = self.lora_dim if alpha is None or alpha == 0 else alpha
|
|
self.scale = alpha / self.lora_dim
|
|
self.register_buffer('alpha', torch.tensor(alpha)) # 定数として扱える
|
|
|
|
# same as microsoft's
|
|
torch.nn.init.kaiming_uniform_(self.lora_down.weight, a=math.sqrt(5))
|
|
torch.nn.init.zeros_(self.lora_up.weight)
|
|
|
|
self.multiplier = multiplier
|
|
self.org_module = org_module # remove in applying
|
|
|
|
def apply_to(self):
|
|
self.org_forward = self.org_module.forward
|
|
self.org_module.forward = self.forward
|
|
del self.org_module
|
|
|
|
def set_as_control_path(self, control_path):
|
|
self.is_control_path = control_path
|
|
|
|
def forward(self, x):
|
|
if not self.is_control_path:
|
|
return self.org_forward(x)
|
|
return self.org_forward(x) + self.lora_up(self.lora_down(x)) * self.multiplier * self.scale
|
|
|
|
|
|
class ControlLoRANetwork(torch.nn.Module):
|
|
# UNET_TARGET_REPLACE_MODULE = ["Transformer2DModel", "Attention"]
|
|
# TEXT_ENCODER_TARGET_REPLACE_MODULE = ["CLIPAttention", "CLIPMLP"]
|
|
LORA_PREFIX_UNET = 'lora_unet'
|
|
LORA_PREFIX_TEXT_ENCODER = 'lora_te'
|
|
|
|
def __init__(self, unet, weights_sd, multiplier=1.0, lora_dim=4, alpha=1) -> None:
|
|
super().__init__()
|
|
self.multiplier = multiplier
|
|
self.lora_dim = lora_dim
|
|
self.alpha = alpha
|
|
|
|
# create module instances
|
|
def create_modules(prefix, root_module: torch.nn.Module) -> List[ControlLoRAModule]: # , target_replace_modules
|
|
loras = []
|
|
for name, module in root_module.named_modules():
|
|
# # if module.__class__.__name__ in target_replace_modules:
|
|
# for child_name, child_module in module.named_modules():
|
|
if module.__class__.__name__ == "Linear" or module.__class__.__name__ == "Conv2d": # and module.kernel_size == (1, 1)):
|
|
lora_name = prefix + '.' + name # + '.' + child_name
|
|
lora_name = lora_name.replace('.', '_')
|
|
|
|
if weights_sd is None:
|
|
dim, alpha = self.lora_dim, self.alpha
|
|
else:
|
|
down_weight = weights_sd.get(lora_name + ".lora_down.weight", None)
|
|
if down_weight is None:
|
|
continue
|
|
dim = down_weight.size()[0]
|
|
alpha = weights_sd.get(lora_name + ".alpha", dim)
|
|
|
|
lora = ControlLoRAModule(lora_name, module, self.multiplier, dim, alpha)
|
|
loras.append(lora)
|
|
return loras
|
|
|
|
self.unet_loras = create_modules(ControlLoRANetwork.LORA_PREFIX_UNET, unet) # , LoRANetwork.UNET_TARGET_REPLACE_MODULE)
|
|
print(f"create LoRA for U-Net: {len(self.unet_loras)} modules.")
|
|
|
|
# make control model
|
|
self.control_model = torch.nn.Module()
|
|
|
|
dims = [320, 320, 320, 320, 640, 640, 640, 1280, 1280, 1280, 1280, 1280]
|
|
zero_convs = torch.nn.ModuleList()
|
|
for i, dim in enumerate(dims):
|
|
sub_list = torch.nn.ModuleList([torch.nn.Conv2d(dim, dim, 1)])
|
|
zero_convs.append(sub_list)
|
|
self.control_model.add_module("zero_convs", zero_convs)
|
|
|
|
middle_block_out = torch.nn.Conv2d(1280, 1280, 1)
|
|
self.control_model.add_module("middle_block_out", torch.nn.ModuleList([middle_block_out]))
|
|
|
|
dims = [16, 16, 32, 32, 96, 96, 256, 320]
|
|
strides = [1, 1, 2, 1, 2, 1, 2, 1]
|
|
prev_dim = 3
|
|
input_hint_block = torch.nn.Sequential()
|
|
for i, (dim, stride) in enumerate(zip(dims, strides)):
|
|
input_hint_block.append(torch.nn.Conv2d(prev_dim, dim, 3, stride, 1))
|
|
if i < len(dims) - 1:
|
|
input_hint_block.append(torch.nn.SiLU())
|
|
prev_dim = dim
|
|
self.control_model.add_module("input_hint_block", input_hint_block)
|
|
|
|
|
|
# def load_weights(self, file):
|
|
# if os.path.splitext(file)[1] == '.safetensors':
|
|
# from safetensors.torch import load_file, safe_open
|
|
# self.weights_sd = load_file(file)
|
|
# else:
|
|
# self.weights_sd = torch.load(file, map_location='cpu')
|
|
|
|
def apply_to(self):
|
|
for lora in self.unet_loras:
|
|
lora.apply_to()
|
|
self.add_module(lora.lora_name, lora)
|
|
|
|
def call_unet(self, unet, hint, sample, timestep, encoder_hidden_states):
|
|
# control path
|
|
hint = hint.to(sample.dtype).to(sample.device)
|
|
guided_hint = self.control_model.input_hint_block(hint)
|
|
|
|
for lora_module in self.unet_loras:
|
|
lora_module.set_as_control_path(True)
|
|
|
|
outs = self.unet_forward(unet, guided_hint, None, sample, timestep, encoder_hidden_states)
|
|
|
|
# U-Net
|
|
for lora_module in self.unet_loras:
|
|
lora_module.set_as_control_path(False)
|
|
|
|
sample = self.unet_forward(unet, None, outs, sample, timestep, encoder_hidden_states)
|
|
|
|
return sample
|
|
|
|
def unet_forward(self, unet: UNet2DConditionModel, guided_hint, ctrl_outs, sample, timestep, encoder_hidden_states):
|
|
# copy from UNet2DConditionModel
|
|
default_overall_up_factor = 2**unet.num_upsamplers
|
|
|
|
forward_upsample_size = False
|
|
upsample_size = None
|
|
|
|
if any(s % default_overall_up_factor != 0 for s in sample.shape[-2:]):
|
|
print("Forward upsample size to force interpolation output size.")
|
|
forward_upsample_size = True
|
|
|
|
# 0. center input if necessary
|
|
if unet.config.center_input_sample:
|
|
sample = 2 * sample - 1.0
|
|
|
|
# 1. time
|
|
timesteps = timestep
|
|
if not torch.is_tensor(timesteps):
|
|
# TODO: this requires sync between CPU and GPU. So try to pass timesteps as tensors if you can
|
|
# This would be a good case for the `match` statement (Python 3.10+)
|
|
is_mps = sample.device.type == "mps"
|
|
if isinstance(timestep, float):
|
|
dtype = torch.float32 if is_mps else torch.float64
|
|
else:
|
|
dtype = torch.int32 if is_mps else torch.int64
|
|
timesteps = torch.tensor([timesteps], dtype=dtype, device=sample.device)
|
|
elif len(timesteps.shape) == 0:
|
|
timesteps = timesteps[None].to(sample.device)
|
|
|
|
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
|
|
timesteps = timesteps.expand(sample.shape[0])
|
|
|
|
t_emb = unet.time_proj(timesteps)
|
|
|
|
# timesteps does not contain any weights and will always return f32 tensors
|
|
# but time_embedding might actually be running in fp16. so we need to cast here.
|
|
# there might be better ways to encapsulate this.
|
|
t_emb = t_emb.to(dtype=unet.dtype)
|
|
emb = unet.time_embedding(t_emb)
|
|
|
|
if ctrl_outs is None:
|
|
outs = [] # control path
|
|
|
|
# 2. pre-process
|
|
sample = unet.conv_in(sample)
|
|
if guided_hint is not None:
|
|
sample += guided_hint
|
|
if ctrl_outs is None:
|
|
outs.append(self.control_model.zero_convs[0][0](sample)) # , emb, encoder_hidden_states))
|
|
|
|
# 3. down
|
|
zc_idx = 1
|
|
down_block_res_samples = (sample,)
|
|
for downsample_block in unet.down_blocks:
|
|
if hasattr(downsample_block, "has_cross_attention") and downsample_block.has_cross_attention:
|
|
sample, res_samples = downsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
)
|
|
else:
|
|
sample, res_samples = downsample_block(hidden_states=sample, temb=emb)
|
|
if ctrl_outs is None:
|
|
for rs in res_samples:
|
|
# print("zc", zc_idx, rs.size())
|
|
outs.append(self.control_model.zero_convs[zc_idx][0](rs)) # , emb, encoder_hidden_states))
|
|
zc_idx += 1
|
|
|
|
down_block_res_samples += res_samples
|
|
|
|
# 4. mid
|
|
sample = unet.mid_block(sample, emb, encoder_hidden_states=encoder_hidden_states)
|
|
if ctrl_outs is None:
|
|
outs.append(self.control_model.middle_block_out[0](sample))
|
|
return outs
|
|
if ctrl_outs is not None:
|
|
sample += ctrl_outs.pop()
|
|
|
|
# 5. up
|
|
for i, upsample_block in enumerate(unet.up_blocks):
|
|
is_final_block = i == len(unet.up_blocks) - 1
|
|
|
|
res_samples = down_block_res_samples[-len(upsample_block.resnets):]
|
|
down_block_res_samples = down_block_res_samples[: -len(upsample_block.resnets)]
|
|
|
|
if ctrl_outs is not None and len(ctrl_outs) > 0:
|
|
res_samples = list(res_samples)
|
|
apply_ctrl_outs = ctrl_outs[-len(res_samples):]
|
|
ctrl_outs = ctrl_outs[:-len(res_samples)]
|
|
for j in range(len(res_samples)):
|
|
res_samples[j] = res_samples[j] + apply_ctrl_outs[j]
|
|
res_samples = tuple(res_samples)
|
|
|
|
# if we have not reached the final block and need to forward the
|
|
# upsample size, we do it here
|
|
if not is_final_block and forward_upsample_size:
|
|
upsample_size = down_block_res_samples[-1].shape[2:]
|
|
|
|
if hasattr(upsample_block, "has_cross_attention") and upsample_block.has_cross_attention:
|
|
sample = upsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
res_hidden_states_tuple=res_samples,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
upsample_size=upsample_size,
|
|
)
|
|
else:
|
|
sample = upsample_block(
|
|
hidden_states=sample, temb=emb, res_hidden_states_tuple=res_samples, upsample_size=upsample_size
|
|
)
|
|
# 6. post-process
|
|
sample = unet.conv_norm_out(sample)
|
|
sample = unet.conv_act(sample)
|
|
sample = unet.conv_out(sample)
|
|
|
|
return (sample,)
|
|
|
|
"""
|
|
default_overall_up_factor = 2**self.num_upsamplers
|
|
|
|
# upsample size should be forwarded when sample is not a multiple of `default_overall_up_factor`
|
|
forward_upsample_size = False
|
|
upsample_size = None
|
|
|
|
if any(s % default_overall_up_factor != 0 for s in sample.shape[-2:]):
|
|
logger.info("Forward upsample size to force interpolation output size.")
|
|
forward_upsample_size = True
|
|
|
|
# 0. center input if necessary
|
|
if self.config.center_input_sample:
|
|
sample = 2 * sample - 1.0
|
|
|
|
# 1. time
|
|
timesteps = timestep
|
|
if not torch.is_tensor(timesteps):
|
|
# TODO: this requires sync between CPU and GPU. So try to pass timesteps as tensors if you can
|
|
# This would be a good case for the `match` statement (Python 3.10+)
|
|
is_mps = sample.device.type == "mps"
|
|
if isinstance(timestep, float):
|
|
dtype = torch.float32 if is_mps else torch.float64
|
|
else:
|
|
dtype = torch.int32 if is_mps else torch.int64
|
|
timesteps = torch.tensor([timesteps], dtype=dtype, device=sample.device)
|
|
elif len(timesteps.shape) == 0:
|
|
timesteps = timesteps[None].to(sample.device)
|
|
|
|
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
|
|
timesteps = timesteps.expand(sample.shape[0])
|
|
|
|
t_emb = self.time_proj(timesteps)
|
|
|
|
# timesteps does not contain any weights and will always return f32 tensors
|
|
# but time_embedding might actually be running in fp16. so we need to cast here.
|
|
# there might be better ways to encapsulate this.
|
|
t_emb = t_emb.to(dtype=self.dtype)
|
|
emb = self.time_embedding(t_emb)
|
|
|
|
if self.config.num_class_embeds is not None:
|
|
if class_labels is None:
|
|
raise ValueError("class_labels should be provided when num_class_embeds > 0")
|
|
class_emb = self.class_embedding(class_labels).to(dtype=self.dtype)
|
|
emb = emb + class_emb
|
|
|
|
# 2. pre-process
|
|
sample = self.conv_in(sample)
|
|
|
|
# 3. down
|
|
down_block_res_samples = (sample,)
|
|
for downsample_block in self.down_blocks:
|
|
if hasattr(downsample_block, "has_cross_attention") and downsample_block.has_cross_attention:
|
|
sample, res_samples = downsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
)
|
|
else:
|
|
sample, res_samples = downsample_block(hidden_states=sample, temb=emb)
|
|
|
|
down_block_res_samples += res_samples
|
|
|
|
# 4. mid
|
|
sample = self.mid_block(sample, emb, encoder_hidden_states=encoder_hidden_states)
|
|
|
|
# 5. up
|
|
for i, upsample_block in enumerate(self.up_blocks):
|
|
is_final_block = i == len(self.up_blocks) - 1
|
|
|
|
res_samples = down_block_res_samples[-len(upsample_block.resnets) :]
|
|
down_block_res_samples = down_block_res_samples[: -len(upsample_block.resnets)]
|
|
|
|
# if we have not reached the final block and need to forward the
|
|
# upsample size, we do it here
|
|
if not is_final_block and forward_upsample_size:
|
|
upsample_size = down_block_res_samples[-1].shape[2:]
|
|
|
|
if hasattr(upsample_block, "has_cross_attention") and upsample_block.has_cross_attention:
|
|
sample = upsample_block(
|
|
hidden_states=sample,
|
|
temb=emb,
|
|
res_hidden_states_tuple=res_samples,
|
|
encoder_hidden_states=encoder_hidden_states,
|
|
upsample_size=upsample_size,
|
|
)
|
|
else:
|
|
sample = upsample_block(
|
|
hidden_states=sample, temb=emb, res_hidden_states_tuple=res_samples, upsample_size=upsample_size
|
|
)
|
|
# 6. post-process
|
|
sample = self.conv_norm_out(sample)
|
|
sample = self.conv_act(sample)
|
|
sample = self.conv_out(sample)
|
|
|
|
if not return_dict:
|
|
return (sample,)
|
|
|
|
return UNet2DConditionOutput(sample=sample)
|
|
"""
|
|
|
|
def enable_gradient_checkpointing(self):
|
|
# not supported
|
|
pass
|
|
|
|
def prepare_optimizer_params(self, text_encoder_lr, unet_lr):
|
|
def enumerate_params(loras):
|
|
params = []
|
|
for lora in loras:
|
|
params.extend(lora.parameters())
|
|
return params
|
|
|
|
self.requires_grad_(True)
|
|
all_params = []
|
|
|
|
if self.text_encoder_loras:
|
|
param_data = {'params': enumerate_params(self.text_encoder_loras)}
|
|
if text_encoder_lr is not None:
|
|
param_data['lr'] = text_encoder_lr
|
|
all_params.append(param_data)
|
|
|
|
if self.unet_loras:
|
|
param_data = {'params': enumerate_params(self.unet_loras)}
|
|
if unet_lr is not None:
|
|
param_data['lr'] = unet_lr
|
|
all_params.append(param_data)
|
|
|
|
return all_params
|
|
|
|
def prepare_grad_etc(self, text_encoder, unet):
|
|
self.requires_grad_(True)
|
|
|
|
def on_epoch_start(self, text_encoder, unet):
|
|
self.train()
|
|
|
|
def get_trainable_params(self):
|
|
return self.parameters()
|
|
|
|
def save_weights(self, file, dtype, metadata):
|
|
if metadata is not None and len(metadata) == 0:
|
|
metadata = None
|
|
|
|
state_dict = self.state_dict()
|
|
|
|
if dtype is not None:
|
|
for key in list(state_dict.keys()):
|
|
v = state_dict[key]
|
|
v = v.detach().clone().to("cpu").to(dtype)
|
|
state_dict[key] = v
|
|
|
|
if os.path.splitext(file)[1] == '.safetensors':
|
|
from safetensors.torch import save_file
|
|
|
|
# Precalculate model hashes to save time on indexing
|
|
if metadata is None:
|
|
metadata = {}
|
|
model_hash, legacy_hash = train_util.precalculate_safetensors_hashes(state_dict, metadata)
|
|
metadata["sshs_model_hash"] = model_hash
|
|
metadata["sshs_legacy_hash"] = legacy_hash
|
|
|
|
save_file(state_dict, file, metadata)
|
|
else:
|
|
torch.save(state_dict, file)
|