| | import glob
|
| | import os
|
| | import re
|
| | import torch
|
| | from typing import Union
|
| |
|
| | from modules import shared, devices, sd_models, errors, scripts
|
| |
|
| | metadata_tags_order = {"ss_sd_model_name": 1, "ss_resolution": 2, "ss_clip_skip": 3, "ss_num_train_images": 10, "ss_tag_frequency": 20}
|
| |
|
| | re_digits = re.compile(r"\d+")
|
| | re_x_proj = re.compile(r"(.*)_([qkv]_proj)$")
|
| | re_compiled = {}
|
| |
|
| | suffix_conversion = {
|
| | "attentions": {},
|
| | "resnets": {
|
| | "conv1": "in_layers_2",
|
| | "conv2": "out_layers_3",
|
| | "time_emb_proj": "emb_layers_1",
|
| | "conv_shortcut": "skip_connection",
|
| | }
|
| | }
|
| |
|
| |
|
| | def convert_diffusers_name_to_compvis(key, is_sd2):
|
| | def match(match_list, regex_text):
|
| | regex = re_compiled.get(regex_text)
|
| | if regex is None:
|
| | regex = re.compile(regex_text)
|
| | re_compiled[regex_text] = regex
|
| |
|
| | r = re.match(regex, key)
|
| | if not r:
|
| | return False
|
| |
|
| | match_list.clear()
|
| | match_list.extend([int(x) if re.match(re_digits, x) else x for x in r.groups()])
|
| | return True
|
| |
|
| | m = []
|
| |
|
| | if match(m, r"lora_unet_down_blocks_(\d+)_(attentions|resnets)_(\d+)_(.+)"):
|
| | suffix = suffix_conversion.get(m[1], {}).get(m[3], m[3])
|
| | return f"diffusion_model_input_blocks_{1 + m[0] * 3 + m[2]}_{1 if m[1] == 'attentions' else 0}_{suffix}"
|
| |
|
| | if match(m, r"lora_unet_mid_block_(attentions|resnets)_(\d+)_(.+)"):
|
| | suffix = suffix_conversion.get(m[0], {}).get(m[2], m[2])
|
| | return f"diffusion_model_middle_block_{1 if m[0] == 'attentions' else m[1] * 2}_{suffix}"
|
| |
|
| | if match(m, r"lora_unet_up_blocks_(\d+)_(attentions|resnets)_(\d+)_(.+)"):
|
| | suffix = suffix_conversion.get(m[1], {}).get(m[3], m[3])
|
| | return f"diffusion_model_output_blocks_{m[0] * 3 + m[2]}_{1 if m[1] == 'attentions' else 0}_{suffix}"
|
| |
|
| | if match(m, r"lora_unet_down_blocks_(\d+)_downsamplers_0_conv"):
|
| | return f"diffusion_model_input_blocks_{3 + m[0] * 3}_0_op"
|
| |
|
| | if match(m, r"lora_unet_up_blocks_(\d+)_upsamplers_0_conv"):
|
| | return f"diffusion_model_output_blocks_{2 + m[0] * 3}_{2 if m[0]>0 else 1}_conv"
|
| |
|
| | if match(m, r"lora_te_text_model_encoder_layers_(\d+)_(.+)"):
|
| | if is_sd2:
|
| | if 'mlp_fc1' in m[1]:
|
| | return f"model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc1', 'mlp_c_fc')}"
|
| | elif 'mlp_fc2' in m[1]:
|
| | return f"model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc2', 'mlp_c_proj')}"
|
| | else:
|
| | return f"model_transformer_resblocks_{m[0]}_{m[1].replace('self_attn', 'attn')}"
|
| |
|
| | return f"transformer_text_model_encoder_layers_{m[0]}_{m[1]}"
|
| |
|
| | return key
|
| |
|
| |
|
| | class LoraOnDisk:
|
| | def __init__(self, name, filename):
|
| | self.name = name
|
| | self.filename = filename
|
| | self.metadata = {}
|
| |
|
| | _, ext = os.path.splitext(filename)
|
| | if ext.lower() == ".safetensors":
|
| | try:
|
| | self.metadata = sd_models.read_metadata_from_safetensors(filename)
|
| | except Exception as e:
|
| | errors.display(e, f"reading lora {filename}")
|
| |
|
| | if self.metadata:
|
| | m = {}
|
| | for k, v in sorted(self.metadata.items(), key=lambda x: metadata_tags_order.get(x[0], 999)):
|
| | m[k] = v
|
| |
|
| | self.metadata = m
|
| |
|
| | self.ssmd_cover_images = self.metadata.pop('ssmd_cover_images', None)
|
| | self.alias = self.metadata.get('ss_output_name', self.name)
|
| |
|
| |
|
| | class LoraModule:
|
| | def __init__(self, name):
|
| | self.name = name
|
| | self.multiplier = 1.0
|
| | self.modules = {}
|
| | self.mtime = None
|
| |
|
| |
|
| | class LoraUpDownModule:
|
| | def __init__(self):
|
| | self.up = None
|
| | self.down = None
|
| | self.alpha = None
|
| |
|
| |
|
| | def assign_lora_names_to_compvis_modules(sd_model):
|
| | lora_layer_mapping = {}
|
| |
|
| | for name, module in shared.sd_model.cond_stage_model.wrapped.named_modules():
|
| | lora_name = name.replace(".", "_")
|
| | lora_layer_mapping[lora_name] = module
|
| | module.lora_layer_name = lora_name
|
| |
|
| | for name, module in shared.sd_model.model.named_modules():
|
| | lora_name = name.replace(".", "_")
|
| | lora_layer_mapping[lora_name] = module
|
| | module.lora_layer_name = lora_name
|
| |
|
| | sd_model.lora_layer_mapping = lora_layer_mapping
|
| |
|
| |
|
| | def load_lora(name, filename):
|
| | lora = LoraModule(name)
|
| | lora.mtime = os.path.getmtime(filename)
|
| |
|
| | sd = sd_models.read_state_dict(filename)
|
| |
|
| |
|
| | if not hasattr(shared.sd_model, 'lora_layer_mapping'):
|
| | assign_lora_names_to_compvis_modules(shared.sd_model)
|
| |
|
| | keys_failed_to_match = {}
|
| | is_sd2 = 'model_transformer_resblocks' in shared.sd_model.lora_layer_mapping
|
| |
|
| | for key_diffusers, weight in sd.items():
|
| | key_diffusers_without_lora_parts, lora_key = key_diffusers.split(".", 1)
|
| | key = convert_diffusers_name_to_compvis(key_diffusers_without_lora_parts, is_sd2)
|
| |
|
| | sd_module = shared.sd_model.lora_layer_mapping.get(key, None)
|
| |
|
| | if sd_module is None:
|
| | m = re_x_proj.match(key)
|
| | if m:
|
| | sd_module = shared.sd_model.lora_layer_mapping.get(m.group(1), None)
|
| |
|
| | if sd_module is None:
|
| | keys_failed_to_match[key_diffusers] = key
|
| | continue
|
| |
|
| | lora_module = lora.modules.get(key, None)
|
| | if lora_module is None:
|
| | lora_module = LoraUpDownModule()
|
| | lora.modules[key] = lora_module
|
| |
|
| | if lora_key == "alpha":
|
| | lora_module.alpha = weight.item()
|
| | continue
|
| |
|
| | if type(sd_module) == torch.nn.Linear:
|
| | module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
|
| | elif type(sd_module) == torch.nn.modules.linear.NonDynamicallyQuantizableLinear:
|
| | module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
|
| | elif type(sd_module) == torch.nn.MultiheadAttention:
|
| | module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
|
| | elif type(sd_module) == torch.nn.Conv2d and weight.shape[2:] == (1, 1):
|
| | module = torch.nn.Conv2d(weight.shape[1], weight.shape[0], (1, 1), bias=False)
|
| | elif type(sd_module) == torch.nn.Conv2d and weight.shape[2:] == (3, 3):
|
| | module = torch.nn.Conv2d(weight.shape[1], weight.shape[0], (3, 3), bias=False)
|
| | else:
|
| | print(f'Lora layer {key_diffusers} matched a layer with unsupported type: {type(sd_module).__name__}')
|
| | continue
|
| | assert False, f'Lora layer {key_diffusers} matched a layer with unsupported type: {type(sd_module).__name__}'
|
| |
|
| | with torch.no_grad():
|
| | module.weight.copy_(weight)
|
| |
|
| | module.to(device=devices.cpu, dtype=devices.dtype)
|
| |
|
| | if lora_key == "lora_up.weight":
|
| | lora_module.up = module
|
| | elif lora_key == "lora_down.weight":
|
| | lora_module.down = module
|
| | else:
|
| | assert False, f'Bad Lora layer name: {key_diffusers} - must end in lora_up.weight, lora_down.weight or alpha'
|
| |
|
| | if len(keys_failed_to_match) > 0:
|
| | print(f"Failed to match keys when loading Lora {filename}: {keys_failed_to_match}")
|
| |
|
| | return lora
|
| |
|
| |
|
| | def load_loras(names, multipliers=None):
|
| | already_loaded = {}
|
| |
|
| | for lora in loaded_loras:
|
| | if lora.name in names:
|
| | already_loaded[lora.name] = lora
|
| |
|
| | loaded_loras.clear()
|
| |
|
| | loras_on_disk = [available_lora_aliases.get(name, None) for name in names]
|
| | if any([x is None for x in loras_on_disk]):
|
| | list_available_loras()
|
| |
|
| | loras_on_disk = [available_lora_aliases.get(name, None) for name in names]
|
| |
|
| | for i, name in enumerate(names):
|
| | lora = already_loaded.get(name, None)
|
| |
|
| | lora_on_disk = loras_on_disk[i]
|
| | if lora_on_disk is not None:
|
| | if lora is None or os.path.getmtime(lora_on_disk.filename) > lora.mtime:
|
| | try:
|
| | lora = load_lora(name, lora_on_disk.filename)
|
| | except Exception as e:
|
| | errors.display(e, f"loading Lora {lora_on_disk.filename}")
|
| | continue
|
| |
|
| | if lora is None:
|
| | print(f"Couldn't find Lora with name {name}")
|
| | continue
|
| |
|
| | lora.multiplier = multipliers[i] if multipliers else 1.0
|
| | loaded_loras.append(lora)
|
| |
|
| |
|
| | def lora_calc_updown(lora, module, target):
|
| | with torch.no_grad():
|
| | up = module.up.weight.to(target.device, dtype=target.dtype)
|
| | down = module.down.weight.to(target.device, dtype=target.dtype)
|
| |
|
| | if up.shape[2:] == (1, 1) and down.shape[2:] == (1, 1):
|
| | updown = (up.squeeze(2).squeeze(2) @ down.squeeze(2).squeeze(2)).unsqueeze(2).unsqueeze(3)
|
| | elif up.shape[2:] == (3, 3) or down.shape[2:] == (3, 3):
|
| | updown = torch.nn.functional.conv2d(down.permute(1, 0, 2, 3), up).permute(1, 0, 2, 3)
|
| | else:
|
| | updown = up @ down
|
| |
|
| | updown = updown * lora.multiplier * (module.alpha / module.up.weight.shape[1] if module.alpha else 1.0)
|
| |
|
| | return updown
|
| |
|
| |
|
| | def lora_restore_weights_from_backup(self: Union[torch.nn.Conv2d, torch.nn.Linear, torch.nn.MultiheadAttention]):
|
| | weights_backup = getattr(self, "lora_weights_backup", None)
|
| |
|
| | if weights_backup is None:
|
| | return
|
| |
|
| | if isinstance(self, torch.nn.MultiheadAttention):
|
| | self.in_proj_weight.copy_(weights_backup[0])
|
| | self.out_proj.weight.copy_(weights_backup[1])
|
| | else:
|
| | self.weight.copy_(weights_backup)
|
| |
|
| |
|
| | def lora_apply_weights(self: Union[torch.nn.Conv2d, torch.nn.Linear, torch.nn.MultiheadAttention]):
|
| | """
|
| | Applies the currently selected set of Loras to the weights of torch layer self.
|
| | If weights already have this particular set of loras applied, does nothing.
|
| | If not, restores orginal weights from backup and alters weights according to loras.
|
| | """
|
| |
|
| | lora_layer_name = getattr(self, 'lora_layer_name', None)
|
| | if lora_layer_name is None:
|
| | return
|
| |
|
| | current_names = getattr(self, "lora_current_names", ())
|
| | wanted_names = tuple((x.name, x.multiplier) for x in loaded_loras)
|
| |
|
| | weights_backup = getattr(self, "lora_weights_backup", None)
|
| | if weights_backup is None:
|
| | if isinstance(self, torch.nn.MultiheadAttention):
|
| | weights_backup = (self.in_proj_weight.to(devices.cpu, copy=True), self.out_proj.weight.to(devices.cpu, copy=True))
|
| | else:
|
| | weights_backup = self.weight.to(devices.cpu, copy=True)
|
| |
|
| | self.lora_weights_backup = weights_backup
|
| |
|
| | if current_names != wanted_names:
|
| | lora_restore_weights_from_backup(self)
|
| |
|
| | for lora in loaded_loras:
|
| | module = lora.modules.get(lora_layer_name, None)
|
| | if module is not None and hasattr(self, 'weight'):
|
| | self.weight += lora_calc_updown(lora, module, self.weight)
|
| | continue
|
| |
|
| | module_q = lora.modules.get(lora_layer_name + "_q_proj", None)
|
| | module_k = lora.modules.get(lora_layer_name + "_k_proj", None)
|
| | module_v = lora.modules.get(lora_layer_name + "_v_proj", None)
|
| | module_out = lora.modules.get(lora_layer_name + "_out_proj", None)
|
| |
|
| | if isinstance(self, torch.nn.MultiheadAttention) and module_q and module_k and module_v and module_out:
|
| | updown_q = lora_calc_updown(lora, module_q, self.in_proj_weight)
|
| | updown_k = lora_calc_updown(lora, module_k, self.in_proj_weight)
|
| | updown_v = lora_calc_updown(lora, module_v, self.in_proj_weight)
|
| | updown_qkv = torch.vstack([updown_q, updown_k, updown_v])
|
| |
|
| | self.in_proj_weight += updown_qkv
|
| | self.out_proj.weight += lora_calc_updown(lora, module_out, self.out_proj.weight)
|
| | continue
|
| |
|
| | if module is None:
|
| | continue
|
| |
|
| | print(f'failed to calculate lora weights for layer {lora_layer_name}')
|
| |
|
| | setattr(self, "lora_current_names", wanted_names)
|
| |
|
| |
|
| | def lora_forward(module, input, original_forward):
|
| | """
|
| | Old way of applying Lora by executing operations during layer's forward.
|
| | Stacking many loras this way results in big performance degradation.
|
| | """
|
| |
|
| | if len(loaded_loras) == 0:
|
| | return original_forward(module, input)
|
| |
|
| | input = devices.cond_cast_unet(input)
|
| |
|
| | lora_restore_weights_from_backup(module)
|
| | lora_reset_cached_weight(module)
|
| |
|
| | res = original_forward(module, input)
|
| |
|
| | lora_layer_name = getattr(module, 'lora_layer_name', None)
|
| | for lora in loaded_loras:
|
| | module = lora.modules.get(lora_layer_name, None)
|
| | if module is None:
|
| | continue
|
| |
|
| | module.up.to(device=devices.device)
|
| | module.down.to(device=devices.device)
|
| |
|
| | res = res + module.up(module.down(input)) * lora.multiplier * (module.alpha / module.up.weight.shape[1] if module.alpha else 1.0)
|
| |
|
| | return res
|
| |
|
| |
|
| | def lora_reset_cached_weight(self: Union[torch.nn.Conv2d, torch.nn.Linear]):
|
| | setattr(self, "lora_current_names", ())
|
| | setattr(self, "lora_weights_backup", None)
|
| |
|
| |
|
| | def lora_Linear_forward(self, input):
|
| | if shared.opts.lora_functional:
|
| | return lora_forward(self, input, torch.nn.Linear_forward_before_lora)
|
| |
|
| | lora_apply_weights(self)
|
| |
|
| | return torch.nn.Linear_forward_before_lora(self, input)
|
| |
|
| |
|
| | def lora_Linear_load_state_dict(self, *args, **kwargs):
|
| | lora_reset_cached_weight(self)
|
| |
|
| | return torch.nn.Linear_load_state_dict_before_lora(self, *args, **kwargs)
|
| |
|
| |
|
| | def lora_Conv2d_forward(self, input):
|
| | if shared.opts.lora_functional:
|
| | return lora_forward(self, input, torch.nn.Conv2d_forward_before_lora)
|
| |
|
| | lora_apply_weights(self)
|
| |
|
| | return torch.nn.Conv2d_forward_before_lora(self, input)
|
| |
|
| |
|
| | def lora_Conv2d_load_state_dict(self, *args, **kwargs):
|
| | lora_reset_cached_weight(self)
|
| |
|
| | return torch.nn.Conv2d_load_state_dict_before_lora(self, *args, **kwargs)
|
| |
|
| |
|
| | def lora_MultiheadAttention_forward(self, *args, **kwargs):
|
| | lora_apply_weights(self)
|
| |
|
| | return torch.nn.MultiheadAttention_forward_before_lora(self, *args, **kwargs)
|
| |
|
| |
|
| | def lora_MultiheadAttention_load_state_dict(self, *args, **kwargs):
|
| | lora_reset_cached_weight(self)
|
| |
|
| | return torch.nn.MultiheadAttention_load_state_dict_before_lora(self, *args, **kwargs)
|
| |
|
| |
|
| | def list_available_loras():
|
| | available_loras.clear()
|
| | available_lora_aliases.clear()
|
| | forbidden_lora_aliases.clear()
|
| | forbidden_lora_aliases.update({"none": 1})
|
| |
|
| | os.makedirs(shared.cmd_opts.lora_dir, exist_ok=True)
|
| |
|
| | candidates = list(shared.walk_files(shared.cmd_opts.lora_dir, allowed_extensions=[".pt", ".ckpt", ".safetensors"]))
|
| | for filename in sorted(candidates, key=str.lower):
|
| | if os.path.isdir(filename):
|
| | continue
|
| |
|
| | name = os.path.splitext(os.path.basename(filename))[0]
|
| | entry = LoraOnDisk(name, filename)
|
| |
|
| | available_loras[name] = entry
|
| |
|
| | if entry.alias in available_lora_aliases:
|
| | forbidden_lora_aliases[entry.alias.lower()] = 1
|
| |
|
| | available_lora_aliases[name] = entry
|
| | available_lora_aliases[entry.alias] = entry
|
| |
|
| |
|
| | re_lora_name = re.compile(r"(.*)\s*\([0-9a-fA-F]+\)")
|
| |
|
| |
|
| | def infotext_pasted(infotext, params):
|
| | if "AddNet Module 1" in [x[1] for x in scripts.scripts_txt2img.infotext_fields]:
|
| | return
|
| |
|
| | added = []
|
| |
|
| | for k, v in params.items():
|
| | if not k.startswith("AddNet Model "):
|
| | continue
|
| |
|
| | num = k[13:]
|
| |
|
| | if params.get("AddNet Module " + num) != "LoRA":
|
| | continue
|
| |
|
| | name = params.get("AddNet Model " + num)
|
| | if name is None:
|
| | continue
|
| |
|
| | m = re_lora_name.match(name)
|
| | if m:
|
| | name = m.group(1)
|
| |
|
| | multiplier = params.get("AddNet Weight A " + num, "1.0")
|
| |
|
| | added.append(f"<lora:{name}:{multiplier}>")
|
| |
|
| | if added:
|
| | params["Prompt"] += "\n" + "".join(added)
|
| |
|
| | available_loras = {}
|
| | available_lora_aliases = {}
|
| | forbidden_lora_aliases = {}
|
| | loaded_loras = []
|
| |
|
| | list_available_loras()
|
| |
|