aboutsummaryrefslogtreecommitdiff
path: root/modules/deepbooru.py
blob: ebdba5e08ded01ebebcb2cc8b3421e718ff8aab0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os.path
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time


def get_deepbooru_tags(pil_image, threshold=0.5):
    """
    This method is for running only one image at a time for simple use.  Used to the img2img interrogate.
    """
    from modules import shared  # prevents circular reference
    create_deepbooru_process(threshold)
    shared.deepbooru_process_return["value"] = -1
    shared.deepbooru_process_queue.put(pil_image)
    while shared.deepbooru_process_return["value"] == -1:
        time.sleep(0.2)
    release_process()


def deepbooru_process(queue, deepbooru_process_return, threshold):
    model, tags = get_deepbooru_tags_model()
    while True: # while process is running, keep monitoring queue for new image
        pil_image = queue.get()
        if pil_image == "QUIT":
            break
        else:
            deepbooru_process_return["value"] = get_deepbooru_tags_from_model(model, tags, pil_image, threshold)


def create_deepbooru_process(threshold=0.5):
    """
    Creates deepbooru process.  A queue is created to send images into the process.  This enables multiple images
    to be processed in a row without reloading the model or creating a new process.  To return the data, a shared
    dictionary is created to hold the tags created.  To wait for tags to be returned, a value of -1 is assigned
    to the dictionary and the method adding the image to the queue should wait for this value to be updated with
    the tags.
    """
    from modules import shared  # prevents circular reference
    shared.deepbooru_process_manager = multiprocessing.Manager()
    shared.deepbooru_process_queue = shared.deepbooru_process_manager.Queue()
    shared.deepbooru_process_return = shared.deepbooru_process_manager.dict()
    shared.deepbooru_process_return["value"] = -1
    shared.deepbooru_process = multiprocessing.Process(target=deepbooru_process, args=(shared.deepbooru_process_queue, shared.deepbooru_process_return, threshold))
    shared.deepbooru_process.start()


def release_process():
    """
    Stops the deepbooru process to return used memory
    """
    from modules import shared  # prevents circular reference
    shared.deepbooru_process_queue.put("QUIT")
    shared.deepbooru_process.join()
    shared.deepbooru_process_queue = None
    shared.deepbooru_process = None
    shared.deepbooru_process_return = None
    shared.deepbooru_process_manager = None

def get_deepbooru_tags_model():
    import deepdanbooru as dd
    import tensorflow as tf
    import numpy as np
    this_folder = os.path.dirname(__file__)
    model_path = os.path.abspath(os.path.join(this_folder, '..', 'models', 'deepbooru'))
    if not os.path.exists(os.path.join(model_path, 'project.json')):
        # there is no point importing these every time
        import zipfile
        from basicsr.utils.download_util import load_file_from_url
        load_file_from_url(
            r"https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20211112-sgd-e28/deepdanbooru-v3-20211112-sgd-e28.zip",
            model_path)
        with zipfile.ZipFile(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"), "r") as zip_ref:
            zip_ref.extractall(model_path)
        os.remove(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"))

    tags = dd.project.load_tags_from_project(model_path)
    model = dd.project.load_model_from_project(
        model_path, compile_model=True
    )
    return model, tags


def get_deepbooru_tags_from_model(model, tags, pil_image, threshold=0.5):
    import deepdanbooru as dd
    import tensorflow as tf
    import numpy as np
    width = model.input_shape[2]
    height = model.input_shape[1]
    image = np.array(pil_image)
    image = tf.image.resize(
        image,
        size=(height, width),
        method=tf.image.ResizeMethod.AREA,
        preserve_aspect_ratio=True,
    )
    image = image.numpy()  # EagerTensor to np.array
    image = dd.image.transform_and_pad_image(image, width, height)
    image = image / 255.0
    image_shape = image.shape
    image = image.reshape((1, image_shape[0], image_shape[1], image_shape[2]))

    y = model.predict(image)[0]

    result_dict = {}

    for i, tag in enumerate(tags):
        result_dict[tag] = y[i]
    result_tags_out = []
    result_tags_print = []
    for tag in tags:
        if result_dict[tag] >= threshold:
            if tag.startswith("rating:"):
                continue
            result_tags_out.append(tag)
            result_tags_print.append(f'{result_dict[tag]} {tag}')

    print('\n'.join(sorted(result_tags_print, reverse=True)))

    return ', '.join(result_tags_out).replace('_', ' ').replace(':', ' ')