import logging import os import cv2 import numpy as np from util import * class Predictor(object): class Backend(object): def __init__(self): raise NotImplementedError() def predict(self, img, top=10): raise NotImplementedError() class BackendTensorflow(Backend): MODEL_DIMENSIONS = 224 def __init__(self, top=10, detail=True, detail_factor=4): logger = logging.getLogger(__name__) logger.debug("Initializing Tensorflow/Keras backend ...") from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input, decode_predictions from tensorflow.keras.preprocessing import image from tensorflow.keras.models import Model self.__model = ResNet50(weights="imagenet") self.__top = top self.__detail = detail self.__detail_factor = detail_factor def __predict(self, img): logger = logging.getLogger(__name__) logger.debug("Predicting image part ...") from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions array = np.expand_dims(img, axis=0) array = preprocess_input(array) predictions = self.__model.predict(array) classes = decode_predictions(predictions, top=self.__top) logger.debug("Predicted raw image classes: {}".format(classes[0])) return set([(name, prob) for _, name, prob in classes[0]]) def __predict_partial(self, tags, img, x, y, rot): logger = logging.getLogger(__name__) logger.debug("Predicting detail image at x={}, y={}, rot={}".format(x, y, rot)) if rot is None: tmp = img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)] else: tmp = cv2.rotate(img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)], rot) tags.update(self.__predict(tmp)) def predict(self, img): logger = logging.getLogger(__name__) logger.debug("Predicting raw image ...") ret = self.__predict(cv2.resize(img.copy(), dsize=(self.MODEL_DIMENSIONS, self.MODEL_DIMENSIONS), interpolation=cv2.INTER_AREA)) if self.__detail: logger.debug("Predicting detail image ...") tmp = set() pool = ThreadPool(max(1, os.cpu_count() - 2), 10000) if img.shape[0] > img.shape[1]: detail = image_resize(img.copy(), height=(self.__detail_factor * self.MODEL_DIMENSIONS)) else: detail = image_resize(img.copy(), width=(self.__detail_factor * self.MODEL_DIMENSIONS)) for x in range(0, detail.shape[0], int(self.MODEL_DIMENSIONS/2)): for y in range(0, detail.shape[1], int(self.MODEL_DIMENSIONS/2)): pool.add_task(self.__predict_partial, ret, detail, x, y, None) pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_CLOCKWISE) pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_180) pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_COUNTERCLOCKWISE) pool.wait_completion() ret = [tag[0] for tag in sorted(ret, key=lambda tag: tag[1], reverse=True)] ret = set(list(dict.fromkeys(ret))[0:self.__top]) return ret class BackendTorch(Backend): def __init__(self, top=10): logger = logging.getLogger(__name__) logger.debug("Initializing Torch backend ...") import torch from torchvision.models import resnet50, ResNet50_Weights self.__weights = ResNet50_Weights.DEFAULT self.__model = resnet50(weights=self.__weights) self.__model.eval() self.__preprocess = self.__weights.transforms() self.__top = top def predict(self, img): import torch from PIL import Image batch = self.__preprocess(Image.fromarray(img)).unsqueeze(0) prediction = self.__model(batch).squeeze(0).softmax(0) classes = torch.topk(prediction.flatten(), self.__top).indices #return set([(weights.meta["categories"][clazz], prediction[clazz].item()) for clazz in classes]) return set([self.__weights.meta["categories"][clazz] for clazz in classes]) def __init__(self, backend): self.__backend = backend def predict(self, img): return self.__backend.predict(img)