aboutsummaryrefslogtreecommitdiff
path: root/modules/deepbooru.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/deepbooru.py')
-rw-r--r--modules/deepbooru.py259
1 files changed, 93 insertions, 166 deletions
diff --git a/modules/deepbooru.py b/modules/deepbooru.py
index 8914662d..122fce7f 100644
--- a/modules/deepbooru.py
+++ b/modules/deepbooru.py
@@ -1,172 +1,99 @@
-import os.path
-from concurrent.futures import ProcessPoolExecutor
-import multiprocessing
-import time
+import os
import re
+import torch
+from PIL import Image
+import numpy as np
+
+from modules import modelloader, paths, deepbooru_model, devices, images, shared
+
re_special = re.compile(r'([\\()])')
-def get_deepbooru_tags(pil_image):
- """
- This method is for running only one image at a time for simple use. Used to the img2img interrogate.
- """
- from modules import shared # prevents circular reference
-
- try:
- create_deepbooru_process(shared.opts.interrogate_deepbooru_score_threshold, create_deepbooru_opts())
- return get_tags_from_process(pil_image)
- finally:
- release_process()
-
-
-OPT_INCLUDE_RANKS = "include_ranks"
-def create_deepbooru_opts():
- from modules import shared
-
- return {
- "use_spaces": shared.opts.deepbooru_use_spaces,
- "use_escape": shared.opts.deepbooru_escape,
- "alpha_sort": shared.opts.deepbooru_sort_alpha,
- OPT_INCLUDE_RANKS: shared.opts.interrogate_return_ranks,
- }
-
-
-def deepbooru_process(queue, deepbooru_process_return, threshold, deepbooru_opts):
- model, tags = get_deepbooru_tags_model()
- while True: # while process is running, keep monitoring queue for new image
- pil_image = queue.get()
- if pil_image == "QUIT":
- break
- else:
- deepbooru_process_return["value"] = get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts)
-
-
-def create_deepbooru_process(threshold, deepbooru_opts):
- """
- Creates deepbooru process. A queue is created to send images into the process. This enables multiple images
- to be processed in a row without reloading the model or creating a new process. To return the data, a shared
- dictionary is created to hold the tags created. To wait for tags to be returned, a value of -1 is assigned
- to the dictionary and the method adding the image to the queue should wait for this value to be updated with
- the tags.
- """
- from modules import shared # prevents circular reference
- shared.deepbooru_process_manager = multiprocessing.Manager()
- shared.deepbooru_process_queue = shared.deepbooru_process_manager.Queue()
- shared.deepbooru_process_return = shared.deepbooru_process_manager.dict()
- shared.deepbooru_process_return["value"] = -1
- shared.deepbooru_process = multiprocessing.Process(target=deepbooru_process, args=(shared.deepbooru_process_queue, shared.deepbooru_process_return, threshold, deepbooru_opts))
- shared.deepbooru_process.start()
-
-
-def get_tags_from_process(image):
- from modules import shared
-
- shared.deepbooru_process_return["value"] = -1
- shared.deepbooru_process_queue.put(image)
- while shared.deepbooru_process_return["value"] == -1:
- time.sleep(0.2)
- caption = shared.deepbooru_process_return["value"]
- shared.deepbooru_process_return["value"] = -1
-
- return caption
-
-
-def release_process():
- """
- Stops the deepbooru process to return used memory
- """
- from modules import shared # prevents circular reference
- shared.deepbooru_process_queue.put("QUIT")
- shared.deepbooru_process.join()
- shared.deepbooru_process_queue = None
- shared.deepbooru_process = None
- shared.deepbooru_process_return = None
- shared.deepbooru_process_manager = None
-
-def get_deepbooru_tags_model():
- import deepdanbooru as dd
- import tensorflow as tf
- import numpy as np
- this_folder = os.path.dirname(__file__)
- model_path = os.path.abspath(os.path.join(this_folder, '..', 'models', 'deepbooru'))
- if not os.path.exists(os.path.join(model_path, 'project.json')):
- # there is no point importing these every time
- import zipfile
- from basicsr.utils.download_util import load_file_from_url
- load_file_from_url(
- r"https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20211112-sgd-e28/deepdanbooru-v3-20211112-sgd-e28.zip",
- model_path)
- with zipfile.ZipFile(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"), "r") as zip_ref:
- zip_ref.extractall(model_path)
- os.remove(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"))
-
- tags = dd.project.load_tags_from_project(model_path)
- model = dd.project.load_model_from_project(
- model_path, compile_model=False
- )
- return model, tags
-
-
-def get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts):
- import deepdanbooru as dd
- import tensorflow as tf
- import numpy as np
-
- alpha_sort = deepbooru_opts['alpha_sort']
- use_spaces = deepbooru_opts['use_spaces']
- use_escape = deepbooru_opts['use_escape']
- include_ranks = deepbooru_opts['include_ranks']
-
- width = model.input_shape[2]
- height = model.input_shape[1]
- image = np.array(pil_image)
- image = tf.image.resize(
- image,
- size=(height, width),
- method=tf.image.ResizeMethod.AREA,
- preserve_aspect_ratio=True,
- )
- image = image.numpy() # EagerTensor to np.array
- image = dd.image.transform_and_pad_image(image, width, height)
- image = image / 255.0
- image_shape = image.shape
- image = image.reshape((1, image_shape[0], image_shape[1], image_shape[2]))
-
- y = model.predict(image)[0]
-
- result_dict = {}
-
- for i, tag in enumerate(tags):
- result_dict[tag] = y[i]
-
- unsorted_tags_in_theshold = []
- result_tags_print = []
- for tag in tags:
- if result_dict[tag] >= threshold:
+
+class DeepDanbooru:
+ def __init__(self):
+ self.model = None
+
+ def load(self):
+ if self.model is not None:
+ return
+
+ files = modelloader.load_models(
+ model_path=os.path.join(paths.models_path, "torch_deepdanbooru"),
+ model_url='https://github.com/AUTOMATIC1111/TorchDeepDanbooru/releases/download/v1/model-resnet_custom_v3.pt',
+ ext_filter=[".pt"],
+ download_name='model-resnet_custom_v3.pt',
+ )
+
+ self.model = deepbooru_model.DeepDanbooruModel()
+ self.model.load_state_dict(torch.load(files[0], map_location="cpu"))
+
+ self.model.eval()
+ self.model.to(devices.cpu, devices.dtype)
+
+ def start(self):
+ self.load()
+ self.model.to(devices.device)
+
+ def stop(self):
+ if not shared.opts.interrogate_keep_models_in_memory:
+ self.model.to(devices.cpu)
+ devices.torch_gc()
+
+ def tag(self, pil_image):
+ self.start()
+ res = self.tag_multi(pil_image)
+ self.stop()
+
+ return res
+
+ def tag_multi(self, pil_image, force_disable_ranks=False):
+ threshold = shared.opts.interrogate_deepbooru_score_threshold
+ use_spaces = shared.opts.deepbooru_use_spaces
+ use_escape = shared.opts.deepbooru_escape
+ alpha_sort = shared.opts.deepbooru_sort_alpha
+ include_ranks = shared.opts.interrogate_return_ranks and not force_disable_ranks
+
+ pic = images.resize_image(2, pil_image.convert("RGB"), 512, 512)
+ a = np.expand_dims(np.array(pic, dtype=np.float32), 0) / 255
+
+ with torch.no_grad(), devices.autocast():
+ x = torch.from_numpy(a).to(devices.device)
+ y = self.model(x)[0].detach().cpu().numpy()
+
+ probability_dict = {}
+
+ for tag, probability in zip(self.model.tags, y):
+ if probability < threshold:
+ continue
+
if tag.startswith("rating:"):
continue
- unsorted_tags_in_theshold.append((result_dict[tag], tag))
- result_tags_print.append(f'{result_dict[tag]} {tag}')
-
- # sort tags
- result_tags_out = []
- sort_ndx = 0
- if alpha_sort:
- sort_ndx = 1
-
- # sort by reverse by likelihood and normal for alpha, and format tag text as requested
- unsorted_tags_in_theshold.sort(key=lambda y: y[sort_ndx], reverse=(not alpha_sort))
- for weight, tag in unsorted_tags_in_theshold:
- tag_outformat = tag
- if use_spaces:
- tag_outformat = tag_outformat.replace('_', ' ')
- if use_escape:
- tag_outformat = re.sub(re_special, r'\\\1', tag_outformat)
- if include_ranks:
- tag_outformat = f"({tag_outformat}:{weight:.3f})"
-
- result_tags_out.append(tag_outformat)
-
- print('\n'.join(sorted(result_tags_print, reverse=True)))
-
- return ', '.join(result_tags_out)
+
+ probability_dict[tag] = probability
+
+ if alpha_sort:
+ tags = sorted(probability_dict)
+ else:
+ tags = [tag for tag, _ in sorted(probability_dict.items(), key=lambda x: -x[1])]
+
+ res = []
+
+ filtertags = set([x.strip().replace(' ', '_') for x in shared.opts.deepbooru_filter_tags.split(",")])
+
+ for tag in [x for x in tags if x not in filtertags]:
+ probability = probability_dict[tag]
+ tag_outformat = tag
+ if use_spaces:
+ tag_outformat = tag_outformat.replace('_', ' ')
+ if use_escape:
+ tag_outformat = re.sub(re_special, r'\\\1', tag_outformat)
+ if include_ranks:
+ tag_outformat = f"({tag_outformat}:{probability:.3f})"
+
+ res.append(tag_outformat)
+
+ return ", ".join(res)
+
+
+model = DeepDanbooru()