aboutsummaryrefslogtreecommitdiff
path: root/predictor.py
blob: 8a886a7dd2de4caaeaf2aebe0cee88eb6a0f674a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
import os
import cv2
import numpy as np
from util import *

class Predictor(object):

    class Backend(object):

        def __init__(self):
            raise NotImplementedError()

        def predict(self, img, top=10):
            raise NotImplementedError()

    class BackendTensorflow(Backend):

        MODEL_DIMENSIONS = 224

        def __init__(self, top=10, detail=True, detail_factor=4):
            logger = logging.getLogger(__name__)
            logger.debug("Initializing Tensorflow/Keras backend ...")
            from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input, decode_predictions
            from tensorflow.keras.preprocessing import image
            from tensorflow.keras.models import Model
            self.__model = ResNet50(weights="imagenet")
            self.__top = top
            self.__detail = detail
            self.__detail_factor = detail_factor

        def __predict(self, img):
            logger = logging.getLogger(__name__)
            logger.debug("Predicting image part ...")
            from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
            array = np.expand_dims(img, axis=0)
            array = preprocess_input(array)
            predictions = self.__model.predict(array)
            classes = decode_predictions(predictions, top=self.__top)
            logger.debug("Predicted raw image classes: {}".format(classes[0]))
            return set([(name, prob) for _, name, prob in classes[0]])

        def __predict_partial(self, tags, img, x, y, rot):
            logger = logging.getLogger(__name__)
            logger.debug("Predicting detail image at x={}, y={}, rot={}".format(x, y, rot))
            if rot is None:
                tmp = img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)]
            else:
                tmp = cv2.rotate(img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)], rot)
            tags.update(self.__predict(tmp))

        def predict(self, img):
            logger = logging.getLogger(__name__)
            logger.debug("Predicting raw image ...")
            ret = self.__predict(cv2.resize(img.copy(), dsize=(self.MODEL_DIMENSIONS, self.MODEL_DIMENSIONS), interpolation=cv2.INTER_AREA))

            if self.__detail:
                logger.debug("Predicting detail image ...")
                tmp = set()
                pool = ThreadPool(max(1, os.cpu_count() - 2), 10000)
                if img.shape[0] > img.shape[1]:
                    detail = image_resize(img.copy(), height=(self.__detail_factor * self.MODEL_DIMENSIONS))
                else:
                    detail = image_resize(img.copy(), width=(self.__detail_factor * self.MODEL_DIMENSIONS))
                for x in range(0, detail.shape[0], int(self.MODEL_DIMENSIONS/2)):
                    for y in range(0, detail.shape[1], int(self.MODEL_DIMENSIONS/2)):
                        pool.add_task(self.__predict_partial, ret, detail, x, y, None)
                        pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_CLOCKWISE)
                        pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_180)
                        pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_COUNTERCLOCKWISE)
                pool.wait_completion()

            ret = [tag[0] for tag in sorted(ret, key=lambda tag: tag[1], reverse=True)]
            ret = set(list(dict.fromkeys(ret))[0:self.__top])
            return ret

    class BackendTorch(Backend):

        def __init__(self, top=10):
            logger = logging.getLogger(__name__)
            logger.debug("Initializing Torch backend ...")
            import torch
            from torchvision.models import resnet50, ResNet50_Weights
            self.__weights = ResNet50_Weights.DEFAULT
            self.__model = resnet50(weights=self.__weights)
            self.__model.eval()
            self.__preprocess = self.__weights.transforms()
            self.__top = top

        def predict(self, img):
            import torch
            from PIL import Image
            batch = self.__preprocess(Image.fromarray(img)).unsqueeze(0)
            prediction = self.__model(batch).squeeze(0).softmax(0)
            classes = torch.topk(prediction.flatten(), self.__top).indices
            #return set([(weights.meta["categories"][clazz], prediction[clazz].item()) for clazz in classes])
            return set([self.__weights.meta["categories"][clazz] for clazz in classes])

    def __init__(self, backend):
        self.__backend = backend

    def predict(self, img):
        return self.__backend.predict(img)