aboutsummaryrefslogtreecommitdiff
path: root/util.py
blob: 9fca80c745d7280bc56559218e3cdd3cad66ded8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import subprocess
import logging
import cv2
import platform
import readline
import os
import numpy as np
from queue import Queue
from threading import Thread, Lock

def image_resize(image, width = None, height = None, inter = cv2.INTER_AREA):
    # initialize the dimensions of the image to be resized and
    # grab the image size
    dim = None
    (h, w) = image.shape[:2]

    # if both the width and height are None, then return the
    # original image
    if width is None and height is None:
        return image

    # check to see if the width is None
    if width is None:
        # calculate the ratio of the height and construct the
        # dimensions
        r = height / float(h)
        dim = (int(w * r), height)

    # otherwise, the height is None
    else:
        # calculate the ratio of the width and construct the
        # dimensions
        r = width / float(w)
        dim = (width, int(h * r))

    # resize the image
    resized = cv2.resize(image, dim, interpolation = inter)

    # return the resized image
    return resized

def image_embed(img, dimensions):
    ret = np.zeros((dimensions[0], dimensions[1], 3), np.uint8)
    ret[0:img.shape[0], 0:img.shape[1]] = img
    return ret

'''
Fetch input prompt with prefilled text.

Parameters:
prompt: Prompt message.
text: Prefilled input.
'''
def input_with_prefill(prompt, text):
    def hook():
        readline.insert_text(text)
        readline.redisplay()
    readline.set_pre_input_hook(hook)
    result = input(prompt)
    readline.set_pre_input_hook()
    return result

'''
Checks if the given string is a valid path.

Parameters:
string: String to be checked.
'''
def dir_path(string):
    if os.path.isdir(string):
        return string
    else:
        raise NotADirectoryError(string)

'''
Opens the given file with the platform default handler.

Parameters:
file: Path to the file.
'''
def open_system(file):
    if platform.system() == 'Darwin':       # macOS
        subprocess.call(('open', file))
    elif platform.system() == 'Windows':    # Windows
        os.startfile(file)
    else:                                   # linux variants
        subprocess.call(('xdg-open', file))

class Worker(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.lock = Lock()
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                if func.lower() == "terminate":
                    break
            except:                
                try: 
                    with self.lock:
                        func(*args, **kargs)
                except Exception as exception: 
                    print(exception)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads, num_queue=None):
        if num_queue is None or num_queue < num_threads:
            num_queue = num_threads
        self.tasks = Queue(num_queue)
        self.threads = num_threads
        for _ in range(num_threads): Worker(self.tasks)
    
    # This function can be called to terminate all the worker threads of the queue
    def terminate(self):
        self.wait_completion()
        for _ in range(self.threads): self.add_task("terminate")
        return None

    # This function can be called to add new work to the queue
    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    # This function can be called to wait till all the workers are done processing the pending works. If this function is called, the main will not process any new lines unless all the workers are done with the pending works.
    def wait_completion(self):
        self.tasks.join()
    
    # This function can be called to check if there are any pending/running works in the queue. If there are any works pending, the call will return Boolean True or else it will return Boolean False    
    def is_alive(self):
        if self.tasks.unfinished_tasks == 0:
            return False
        else:
            return True