aboutsummaryrefslogtreecommitdiff
path: root/extensions-builtin/Lora/lora.py
diff options
context:
space:
mode:
Diffstat (limited to 'extensions-builtin/Lora/lora.py')
-rw-r--r--extensions-builtin/Lora/lora.py537
1 files changed, 0 insertions, 537 deletions
diff --git a/extensions-builtin/Lora/lora.py b/extensions-builtin/Lora/lora.py
deleted file mode 100644
index 9cdff6ed..00000000
--- a/extensions-builtin/Lora/lora.py
+++ /dev/null
@@ -1,537 +0,0 @@
-import os
-import re
-import torch
-from typing import Union
-
-from modules import shared, devices, sd_models, errors, scripts, sd_hijack, hashes, cache
-
-metadata_tags_order = {"ss_sd_model_name": 1, "ss_resolution": 2, "ss_clip_skip": 3, "ss_num_train_images": 10, "ss_tag_frequency": 20}
-
-re_digits = re.compile(r"\d+")
-re_x_proj = re.compile(r"(.*)_([qkv]_proj)$")
-re_compiled = {}
-
-suffix_conversion = {
- "attentions": {},
- "resnets": {
- "conv1": "in_layers_2",
- "conv2": "out_layers_3",
- "time_emb_proj": "emb_layers_1",
- "conv_shortcut": "skip_connection",
- }
-}
-
-
-def convert_diffusers_name_to_compvis(key, is_sd2):
- def match(match_list, regex_text):
- regex = re_compiled.get(regex_text)
- if regex is None:
- regex = re.compile(regex_text)
- re_compiled[regex_text] = regex
-
- r = re.match(regex, key)
- if not r:
- return False
-
- match_list.clear()
- match_list.extend([int(x) if re.match(re_digits, x) else x for x in r.groups()])
- return True
-
- m = []
-
- if match(m, r"lora_unet_down_blocks_(\d+)_(attentions|resnets)_(\d+)_(.+)"):
- suffix = suffix_conversion.get(m[1], {}).get(m[3], m[3])
- return f"diffusion_model_input_blocks_{1 + m[0] * 3 + m[2]}_{1 if m[1] == 'attentions' else 0}_{suffix}"
-
- if match(m, r"lora_unet_mid_block_(attentions|resnets)_(\d+)_(.+)"):
- suffix = suffix_conversion.get(m[0], {}).get(m[2], m[2])
- return f"diffusion_model_middle_block_{1 if m[0] == 'attentions' else m[1] * 2}_{suffix}"
-
- if match(m, r"lora_unet_up_blocks_(\d+)_(attentions|resnets)_(\d+)_(.+)"):
- suffix = suffix_conversion.get(m[1], {}).get(m[3], m[3])
- return f"diffusion_model_output_blocks_{m[0] * 3 + m[2]}_{1 if m[1] == 'attentions' else 0}_{suffix}"
-
- if match(m, r"lora_unet_down_blocks_(\d+)_downsamplers_0_conv"):
- return f"diffusion_model_input_blocks_{3 + m[0] * 3}_0_op"
-
- if match(m, r"lora_unet_up_blocks_(\d+)_upsamplers_0_conv"):
- return f"diffusion_model_output_blocks_{2 + m[0] * 3}_{2 if m[0]>0 else 1}_conv"
-
- if match(m, r"lora_te_text_model_encoder_layers_(\d+)_(.+)"):
- if is_sd2:
- if 'mlp_fc1' in m[1]:
- return f"model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc1', 'mlp_c_fc')}"
- elif 'mlp_fc2' in m[1]:
- return f"model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc2', 'mlp_c_proj')}"
- else:
- return f"model_transformer_resblocks_{m[0]}_{m[1].replace('self_attn', 'attn')}"
-
- return f"transformer_text_model_encoder_layers_{m[0]}_{m[1]}"
-
- if match(m, r"lora_te2_text_model_encoder_layers_(\d+)_(.+)"):
- if 'mlp_fc1' in m[1]:
- return f"1_model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc1', 'mlp_c_fc')}"
- elif 'mlp_fc2' in m[1]:
- return f"1_model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc2', 'mlp_c_proj')}"
- else:
- return f"1_model_transformer_resblocks_{m[0]}_{m[1].replace('self_attn', 'attn')}"
-
- return key
-
-
-class LoraOnDisk:
- def __init__(self, name, filename):
- self.name = name
- self.filename = filename
- self.metadata = {}
- self.is_safetensors = os.path.splitext(filename)[1].lower() == ".safetensors"
-
- def read_metadata():
- metadata = sd_models.read_metadata_from_safetensors(filename)
- metadata.pop('ssmd_cover_images', None) # those are cover images, and they are too big to display in UI as text
-
- return metadata
-
- if self.is_safetensors:
- try:
- self.metadata = cache.cached_data_for_file('safetensors-metadata', "lora/" + self.name, filename, read_metadata)
- except Exception as e:
- errors.display(e, f"reading lora {filename}")
-
- if self.metadata:
- m = {}
- for k, v in sorted(self.metadata.items(), key=lambda x: metadata_tags_order.get(x[0], 999)):
- m[k] = v
-
- self.metadata = m
-
- self.alias = self.metadata.get('ss_output_name', self.name)
-
- self.hash = None
- self.shorthash = None
- self.set_hash(
- self.metadata.get('sshs_model_hash') or
- hashes.sha256_from_cache(self.filename, "lora/" + self.name, use_addnet_hash=self.is_safetensors) or
- ''
- )
-
- def set_hash(self, v):
- self.hash = v
- self.shorthash = self.hash[0:12]
-
- if self.shorthash:
- available_lora_hash_lookup[self.shorthash] = self
-
- def read_hash(self):
- if not self.hash:
- self.set_hash(hashes.sha256(self.filename, "lora/" + self.name, use_addnet_hash=self.is_safetensors) or '')
-
- def get_alias(self):
- if shared.opts.lora_preferred_name == "Filename" or self.alias.lower() in forbidden_lora_aliases:
- return self.name
- else:
- return self.alias
-
-
-class LoraModule:
- def __init__(self, name, lora_on_disk: LoraOnDisk):
- self.name = name
- self.lora_on_disk = lora_on_disk
- self.multiplier = 1.0
- self.modules = {}
- self.mtime = None
-
- self.mentioned_name = None
- """the text that was used to add lora to prompt - can be either name or an alias"""
-
-
-class LoraUpDownModule:
- def __init__(self):
- self.up = None
- self.down = None
- self.alpha = None
-
-
-def assign_lora_names_to_compvis_modules(sd_model):
- lora_layer_mapping = {}
-
- if shared.sd_model.is_sdxl:
- for i, embedder in enumerate(shared.sd_model.conditioner.embedders):
- if not hasattr(embedder, 'wrapped'):
- continue
-
- for name, module in embedder.wrapped.named_modules():
- lora_name = f'{i}_{name.replace(".", "_")}'
- lora_layer_mapping[lora_name] = module
- module.lora_layer_name = lora_name
- else:
- for name, module in shared.sd_model.cond_stage_model.wrapped.named_modules():
- lora_name = name.replace(".", "_")
- lora_layer_mapping[lora_name] = module
- module.lora_layer_name = lora_name
-
- for name, module in shared.sd_model.model.named_modules():
- lora_name = name.replace(".", "_")
- lora_layer_mapping[lora_name] = module
- module.lora_layer_name = lora_name
-
- sd_model.lora_layer_mapping = lora_layer_mapping
-
-
-def load_lora(name, lora_on_disk):
- lora = LoraModule(name, lora_on_disk)
- lora.mtime = os.path.getmtime(lora_on_disk.filename)
-
- sd = sd_models.read_state_dict(lora_on_disk.filename)
-
- # this should not be needed but is here as an emergency fix for an unknown error people are experiencing in 1.2.0
- if not hasattr(shared.sd_model, 'lora_layer_mapping'):
- assign_lora_names_to_compvis_modules(shared.sd_model)
-
- keys_failed_to_match = {}
- is_sd2 = 'model_transformer_resblocks' in shared.sd_model.lora_layer_mapping
-
- for key_lora, weight in sd.items():
- key_lora_without_lora_parts, lora_key = key_lora.split(".", 1)
-
- key = convert_diffusers_name_to_compvis(key_lora_without_lora_parts, is_sd2)
- sd_module = shared.sd_model.lora_layer_mapping.get(key, None)
-
- if sd_module is None:
- m = re_x_proj.match(key)
- if m:
- sd_module = shared.sd_model.lora_layer_mapping.get(m.group(1), None)
-
- # SDXL loras seem to already have correct compvis keys, so only need to replace "lora_unet" with "diffusion_model"
- if sd_module is None and "lora_unet" in key_lora_without_lora_parts:
- key = key_lora_without_lora_parts.replace("lora_unet", "diffusion_model")
- sd_module = shared.sd_model.lora_layer_mapping.get(key, None)
- elif sd_module is None and "lora_te1_text_model" in key_lora_without_lora_parts:
- key = key_lora_without_lora_parts.replace("lora_te1_text_model", "0_transformer_text_model")
- sd_module = shared.sd_model.lora_layer_mapping.get(key, None)
-
- if sd_module is None:
- keys_failed_to_match[key_lora] = key
- continue
-
- lora_module = lora.modules.get(key, None)
- if lora_module is None:
- lora_module = LoraUpDownModule()
- lora.modules[key] = lora_module
-
- if lora_key == "alpha":
- lora_module.alpha = weight.item()
- continue
-
- if type(sd_module) == torch.nn.Linear:
- module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
- elif type(sd_module) == torch.nn.modules.linear.NonDynamicallyQuantizableLinear:
- module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
- elif type(sd_module) == torch.nn.MultiheadAttention:
- module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
- elif type(sd_module) == torch.nn.Conv2d and weight.shape[2:] == (1, 1):
- module = torch.nn.Conv2d(weight.shape[1], weight.shape[0], (1, 1), bias=False)
- elif type(sd_module) == torch.nn.Conv2d and weight.shape[2:] == (3, 3):
- module = torch.nn.Conv2d(weight.shape[1], weight.shape[0], (3, 3), bias=False)
- else:
- print(f'Lora layer {key_lora} matched a layer with unsupported type: {type(sd_module).__name__}')
- continue
- raise AssertionError(f"Lora layer {key_lora} matched a layer with unsupported type: {type(sd_module).__name__}")
-
- with torch.no_grad():
- module.weight.copy_(weight)
-
- module.to(device=devices.cpu, dtype=devices.dtype)
-
- if lora_key == "lora_up.weight":
- lora_module.up = module
- elif lora_key == "lora_down.weight":
- lora_module.down = module
- else:
- raise AssertionError(f"Bad Lora layer name: {key_lora} - must end in lora_up.weight, lora_down.weight or alpha")
-
- if keys_failed_to_match:
- print(f"Failed to match keys when loading Lora {lora_on_disk.filename}: {keys_failed_to_match}")
-
- return lora
-
-
-def load_loras(names, multipliers=None):
- already_loaded = {}
-
- for lora in loaded_loras:
- if lora.name in names:
- already_loaded[lora.name] = lora
-
- loaded_loras.clear()
-
- loras_on_disk = [available_lora_aliases.get(name, None) for name in names]
- if any(x is None for x in loras_on_disk):
- list_available_loras()
-
- loras_on_disk = [available_lora_aliases.get(name, None) for name in names]
-
- failed_to_load_loras = []
-
- for i, name in enumerate(names):
- lora = already_loaded.get(name, None)
-
- lora_on_disk = loras_on_disk[i]
-
- if lora_on_disk is not None:
- if lora is None or os.path.getmtime(lora_on_disk.filename) > lora.mtime:
- try:
- lora = load_lora(name, lora_on_disk)
- except Exception as e:
- errors.display(e, f"loading Lora {lora_on_disk.filename}")
- continue
-
- lora.mentioned_name = name
-
- lora_on_disk.read_hash()
-
- if lora is None:
- failed_to_load_loras.append(name)
- print(f"Couldn't find Lora with name {name}")
- continue
-
- lora.multiplier = multipliers[i] if multipliers else 1.0
- loaded_loras.append(lora)
-
- if failed_to_load_loras:
- sd_hijack.model_hijack.comments.append("Failed to find Loras: " + ", ".join(failed_to_load_loras))
-
-
-def lora_calc_updown(lora, module, target):
- with torch.no_grad():
- up = module.up.weight.to(target.device, dtype=target.dtype)
- down = module.down.weight.to(target.device, dtype=target.dtype)
-
- if up.shape[2:] == (1, 1) and down.shape[2:] == (1, 1):
- updown = (up.squeeze(2).squeeze(2) @ down.squeeze(2).squeeze(2)).unsqueeze(2).unsqueeze(3)
- elif up.shape[2:] == (3, 3) or down.shape[2:] == (3, 3):
- updown = torch.nn.functional.conv2d(down.permute(1, 0, 2, 3), up).permute(1, 0, 2, 3)
- else:
- updown = up @ down
-
- updown = updown * lora.multiplier * (module.alpha / module.up.weight.shape[1] if module.alpha else 1.0)
-
- return updown
-
-
-def lora_restore_weights_from_backup(self: Union[torch.nn.Conv2d, torch.nn.Linear, torch.nn.MultiheadAttention]):
- weights_backup = getattr(self, "lora_weights_backup", None)
-
- if weights_backup is None:
- return
-
- if isinstance(self, torch.nn.MultiheadAttention):
- self.in_proj_weight.copy_(weights_backup[0])
- self.out_proj.weight.copy_(weights_backup[1])
- else:
- self.weight.copy_(weights_backup)
-
-
-def lora_apply_weights(self: Union[torch.nn.Conv2d, torch.nn.Linear, torch.nn.MultiheadAttention]):
- """
- Applies the currently selected set of Loras to the weights of torch layer self.
- If weights already have this particular set of loras applied, does nothing.
- If not, restores orginal weights from backup and alters weights according to loras.
- """
-
- lora_layer_name = getattr(self, 'lora_layer_name', None)
- if lora_layer_name is None:
- return
-
- current_names = getattr(self, "lora_current_names", ())
- wanted_names = tuple((x.name, x.multiplier) for x in loaded_loras)
-
- weights_backup = getattr(self, "lora_weights_backup", None)
- if weights_backup is None:
- if isinstance(self, torch.nn.MultiheadAttention):
- weights_backup = (self.in_proj_weight.to(devices.cpu, copy=True), self.out_proj.weight.to(devices.cpu, copy=True))
- else:
- weights_backup = self.weight.to(devices.cpu, copy=True)
-
- self.lora_weights_backup = weights_backup
-
- if current_names != wanted_names:
- lora_restore_weights_from_backup(self)
-
- for lora in loaded_loras:
- module = lora.modules.get(lora_layer_name, None)
- if module is not None and hasattr(self, 'weight'):
- self.weight += lora_calc_updown(lora, module, self.weight)
- continue
-
- module_q = lora.modules.get(lora_layer_name + "_q_proj", None)
- module_k = lora.modules.get(lora_layer_name + "_k_proj", None)
- module_v = lora.modules.get(lora_layer_name + "_v_proj", None)
- module_out = lora.modules.get(lora_layer_name + "_out_proj", None)
-
- if isinstance(self, torch.nn.MultiheadAttention) and module_q and module_k and module_v and module_out:
- updown_q = lora_calc_updown(lora, module_q, self.in_proj_weight)
- updown_k = lora_calc_updown(lora, module_k, self.in_proj_weight)
- updown_v = lora_calc_updown(lora, module_v, self.in_proj_weight)
- updown_qkv = torch.vstack([updown_q, updown_k, updown_v])
-
- self.in_proj_weight += updown_qkv
- self.out_proj.weight += lora_calc_updown(lora, module_out, self.out_proj.weight)
- continue
-
- if module is None:
- continue
-
- print(f'failed to calculate lora weights for layer {lora_layer_name}')
-
- self.lora_current_names = wanted_names
-
-
-def lora_forward(module, input, original_forward):
- """
- Old way of applying Lora by executing operations during layer's forward.
- Stacking many loras this way results in big performance degradation.
- """
-
- if len(loaded_loras) == 0:
- return original_forward(module, input)
-
- input = devices.cond_cast_unet(input)
-
- lora_restore_weights_from_backup(module)
- lora_reset_cached_weight(module)
-
- res = original_forward(module, input)
-
- lora_layer_name = getattr(module, 'lora_layer_name', None)
- for lora in loaded_loras:
- module = lora.modules.get(lora_layer_name, None)
- if module is None:
- continue
-
- module.up.to(device=devices.device)
- module.down.to(device=devices.device)
-
- res = res + module.up(module.down(input)) * lora.multiplier * (module.alpha / module.up.weight.shape[1] if module.alpha else 1.0)
-
- return res
-
-
-def lora_reset_cached_weight(self: Union[torch.nn.Conv2d, torch.nn.Linear]):
- self.lora_current_names = ()
- self.lora_weights_backup = None
-
-
-def lora_Linear_forward(self, input):
- if shared.opts.lora_functional:
- return lora_forward(self, input, torch.nn.Linear_forward_before_lora)
-
- lora_apply_weights(self)
-
- return torch.nn.Linear_forward_before_lora(self, input)
-
-
-def lora_Linear_load_state_dict(self, *args, **kwargs):
- lora_reset_cached_weight(self)
-
- return torch.nn.Linear_load_state_dict_before_lora(self, *args, **kwargs)
-
-
-def lora_Conv2d_forward(self, input):
- if shared.opts.lora_functional:
- return lora_forward(self, input, torch.nn.Conv2d_forward_before_lora)
-
- lora_apply_weights(self)
-
- return torch.nn.Conv2d_forward_before_lora(self, input)
-
-
-def lora_Conv2d_load_state_dict(self, *args, **kwargs):
- lora_reset_cached_weight(self)
-
- return torch.nn.Conv2d_load_state_dict_before_lora(self, *args, **kwargs)
-
-
-def lora_MultiheadAttention_forward(self, *args, **kwargs):
- lora_apply_weights(self)
-
- return torch.nn.MultiheadAttention_forward_before_lora(self, *args, **kwargs)
-
-
-def lora_MultiheadAttention_load_state_dict(self, *args, **kwargs):
- lora_reset_cached_weight(self)
-
- return torch.nn.MultiheadAttention_load_state_dict_before_lora(self, *args, **kwargs)
-
-
-def list_available_loras():
- available_loras.clear()
- available_lora_aliases.clear()
- forbidden_lora_aliases.clear()
- available_lora_hash_lookup.clear()
- forbidden_lora_aliases.update({"none": 1, "Addams": 1})
-
- os.makedirs(shared.cmd_opts.lora_dir, exist_ok=True)
-
- candidates = list(shared.walk_files(shared.cmd_opts.lora_dir, allowed_extensions=[".pt", ".ckpt", ".safetensors"]))
- for filename in candidates:
- if os.path.isdir(filename):
- continue
-
- name = os.path.splitext(os.path.basename(filename))[0]
- try:
- entry = LoraOnDisk(name, filename)
- except OSError: # should catch FileNotFoundError and PermissionError etc.
- errors.report(f"Failed to load LoRA {name} from {filename}", exc_info=True)
- continue
-
- available_loras[name] = entry
-
- if entry.alias in available_lora_aliases:
- forbidden_lora_aliases[entry.alias.lower()] = 1
-
- available_lora_aliases[name] = entry
- available_lora_aliases[entry.alias] = entry
-
-
-re_lora_name = re.compile(r"(.*)\s*\([0-9a-fA-F]+\)")
-
-
-def infotext_pasted(infotext, params):
- if "AddNet Module 1" in [x[1] for x in scripts.scripts_txt2img.infotext_fields]:
- return # if the other extension is active, it will handle those fields, no need to do anything
-
- added = []
-
- for k in params:
- if not k.startswith("AddNet Model "):
- continue
-
- num = k[13:]
-
- if params.get("AddNet Module " + num) != "LoRA":
- continue
-
- name = params.get("AddNet Model " + num)
- if name is None:
- continue
-
- m = re_lora_name.match(name)
- if m:
- name = m.group(1)
-
- multiplier = params.get("AddNet Weight A " + num, "1.0")
-
- added.append(f"<lora:{name}:{multiplier}>")
-
- if added:
- params["Prompt"] += "\n" + "".join(added)
-
-
-available_loras = {}
-available_lora_aliases = {}
-available_lora_hash_lookup = {}
-forbidden_lora_aliases = {}
-loaded_loras = []
-
-list_available_loras()