import subprocess import logging import cv2 import platform import readline import os import numpy as np from queue import Queue from threading import Thread, Lock def image_resize(image, width = None, height = None, inter = cv2.INTER_AREA): # initialize the dimensions of the image to be resized and # grab the image size dim = None (h, w) = image.shape[:2] # if both the width and height are None, then return the # original image if width is None and height is None: return image # check to see if the width is None if width is None: # calculate the ratio of the height and construct the # dimensions r = height / float(h) dim = (int(w * r), height) # otherwise, the height is None else: # calculate the ratio of the width and construct the # dimensions r = width / float(w) dim = (width, int(h * r)) # resize the image resized = cv2.resize(image, dim, interpolation = inter) # return the resized image return resized def image_embed(img, dimensions): ret = np.zeros((dimensions[0], dimensions[1], 3), np.uint8) ret[0:img.shape[0], 0:img.shape[1]] = img return ret ''' Fetch input prompt with prefilled text. Parameters: prompt: Prompt message. text: Prefilled input. ''' def input_with_prefill(prompt, text): def hook(): readline.insert_text(text) readline.redisplay() readline.set_pre_input_hook(hook) result = input(prompt) readline.set_pre_input_hook() return result ''' Checks if the given string is a valid path. Parameters: string: String to be checked. ''' def dir_path(string): if os.path.isdir(string): return string else: raise NotADirectoryError(string) ''' Opens the given file with the platform default handler. Parameters: file: Path to the file. ''' def open_system(file): if platform.system() == 'Darwin': # macOS subprocess.call(('open', file)) elif platform.system() == 'Windows': # Windows os.startfile(file) else: # linux variants subprocess.call(('xdg-open', file)) class Worker(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.lock = Lock() self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: if func.lower() == "terminate": break except: try: with self.lock: func(*args, **kargs) except Exception as exception: print(exception) self.tasks.task_done() class ThreadPool: def __init__(self, num_threads, num_queue=None): if num_queue is None or num_queue < num_threads: num_queue = num_threads self.tasks = Queue(num_queue) self.threads = num_threads for _ in range(num_threads): Worker(self.tasks) # This function can be called to terminate all the worker threads of the queue def terminate(self): self.wait_completion() for _ in range(self.threads): self.add_task("terminate") return None # This function can be called to add new work to the queue def add_task(self, func, *args, **kargs): self.tasks.put((func, args, kargs)) # This function can be called to wait till all the workers are done processing the pending works. If this function is called, the main will not process any new lines unless all the workers are done with the pending works. def wait_completion(self): self.tasks.join() # This function can be called to check if there are any pending/running works in the queue. If there are any works pending, the call will return Boolean True or else it will return Boolean False def is_alive(self): if self.tasks.unfinished_tasks == 0: return False else: return True