diff --git a/src/compare.py b/src/compare.py index e603a4e..da90195 100644 --- a/src/compare.py +++ b/src/compare.py @@ -303,7 +303,7 @@ while True: lowest_certainty = match # Check if a match that's confident enough - if 0 < match < video_certainty or True: + if 0 < match < video_certainty: timings["tt"] = time.time() - timings["st"] timings["fl"] = time.time() - timings["fr"] diff --git a/src/rubberstamps/__init__.py b/src/rubberstamps/__init__.py index 890e627..1c85297 100644 --- a/src/rubberstamps/__init__.py +++ b/src/rubberstamps/__init__.py @@ -8,10 +8,13 @@ from importlib.machinery import SourceFileLoader class RubberStamp: + """Howdy rubber stamp""" + UI_TEXT = "ui_text" UI_SUBTEXT = "ui_subtext" def set_ui_text(self, text, type=None): + """Convert an ui string to input howdy-gtk understands""" typedec = "M" if type == self.UI_SUBTEXT: @@ -20,12 +23,16 @@ class RubberStamp: return self.send_ui_raw(typedec + "=" + text) def send_ui_raw(self, command): + """Write raw command to howdy-gtk stdin""" if self.config.getboolean("debug", "verbose_stamps", fallback=False): print("Sending command to howdy-gtk: " + command) + # Add a newline because the ui reads per line command += " \n" + # If we're connected to the ui if self.gtk_proc: + # Send the command as bytes self.gtk_proc.stdin.write(bytearray(command.encode("utf-8"))) self.gtk_proc.stdin.flush() @@ -39,62 +46,77 @@ def execute(config, gtk_proc, opencv): dir_path = os.path.dirname(os.path.realpath(__file__)) installed_stamps = [] + # Go through each file in the rubberstamp folder for filename in os.listdir(dir_path): + # Remove non-readable file or directories if not os.path.isfile(dir_path + "/" + filename): continue + # Remove meta files if filename in ["__init__.py", ".gitignore"]: continue + # Add the found file to the list of enabled rubberstamps installed_stamps.append(filename.split(".")[0]) if verbose: print("Installed rubberstamps: " + ", ".join(installed_stamps)) + # Get the rules defined in the config raw_rules = config.get("rubberstamps", "stamp_rules") rules = raw_rules.split("\n") + # Go through the rules one by one for rule in rules: rule = rule.strip() if len(rule) <= 1: continue + # Parse the rule with regex regex_result = re.search("^(\w+)\s+([\w\.]+)\s+([a-z]+)(.*)?$", rule, re.IGNORECASE) + # Error out if the regex did not match (invalid line) if not regex_result: print(_("Error parsing rubberstamp rule: {}").format(rule)) continue type = regex_result.group(1) + # Error out if the stamp name in the rule is not a file if type not in installed_stamps: print(_("Stamp not installed: {}").format(type)) continue + # Load the module from file module = SourceFileLoader(type, dir_path + "/" + type + ".py").load_module() + # Try to get the class with the same name try: constructor = getattr(module, type) except AttributeError: print(_("Stamp error: Class {} not found").format(type)) continue + # Init the class and set common values instance = constructor() instance.verbose = verbose instance.config = config instance.gtk_proc = gtk_proc instance.opencv = opencv + # Set some opensv shorthands instance.video_capture = opencv["video_capture"] instance.face_detector = opencv["face_detector"] instance.pose_predictor = opencv["pose_predictor"] instance.clahe = opencv["clahe"] + # Parse and set the 2 required options for all rubberstamps instance.options = { "timeout": float(re.sub("[a-zA-Z]", "", regex_result.group(2))), "failsafe": regex_result.group(3) != "faildeadly" } + # Try to get the class do declare its other config variables try: instance.declare_config() except Exception: @@ -104,15 +126,20 @@ def execute(config, gtk_proc, opencv): traceback.print_exc() continue + # Split the optional arguments at the end of the rule by spaces raw_options = regex_result.group(4).split() + # For each of those aoptional arguments for option in raw_options: + # Get the key to the left, and the value to the right of the equal sign key, value = option.split("=") + # Error out if a key has been set that was not declared by the module before if key not in instance.options: print("Unknow config option for rubberstamp " + type + ": " + key) continue + # Convert the argument string to an int or float if the declared option has that type if isinstance(instance.options[key], int): value = int(value) elif isinstance(instance.options[key], float): @@ -125,8 +152,10 @@ def execute(config, gtk_proc, opencv): print(instance.options) print("Executing stamp") + # Make the stamp fail by default result = False + # Run the stamp code try: result = instance.run() except Exception: @@ -138,6 +167,7 @@ def execute(config, gtk_proc, opencv): if verbose: print("Stamp \"" + type + "\" returned: " + str(result)) + # Abort authentication if the stamp returned false if result is False: if verbose: print("Authentication aborted by rubber stamp") sys.exit(14) diff --git a/src/rubberstamps/hotkey.py b/src/rubberstamps/hotkey.py index f4d3d34..309aff9 100644 --- a/src/rubberstamps/hotkey.py +++ b/src/rubberstamps/hotkey.py @@ -2,6 +2,8 @@ import time import sys from i18n import _ + +# Import the root rubberstamp class from rubberstamps import RubberStamp @@ -9,16 +11,20 @@ class hotkey(RubberStamp): pressed_key = "none" def declare_config(self): + """Set the default values for the optional arguments""" self.options["abort_key"] = "esc" self.options["confirm_key"] = "enter" def run(self): + """Wait for the user to press a hotkey""" time_left = self.options["timeout"] time_string = _("Aborting authorisation in {}") if self.options["failsafe"] else _("Authorising in {}") + # Set the ui to default strings self.set_ui_text(time_string.format(int(time_left)), self.UI_TEXT) - self.set_ui_text(_("Hold {abort_key} to abort, hold {confirm_key} to authorise").format(abort_key=self.options["abort_key"], confirm_key=self.options["confirm_key"]), self.UI_SUBTEXT) + self.set_ui_text(_("Press {abort_key} to abort, {confirm_key} to authorise").format(abort_key=self.options["abort_key"], confirm_key=self.options["confirm_key"]), self.UI_SUBTEXT) + # Try to import the keyboard module and tell the user to install the module if that fails try: import keyboard except Exception: @@ -27,26 +33,37 @@ class hotkey(RubberStamp): print("\t pip3 install keyboard") sys.exit(1) + # Register hotkeys with the kernel keyboard.add_hotkey(self.options["abort_key"], self.on_key, args=["abort"]) keyboard.add_hotkey(self.options["confirm_key"], self.on_key, args=["confirm"]) + # While we have not hit our timeout yet while time_left > 0: + # Remove 0.1 seconds from the timer, as that's how long we sleep time_left -= 0.1 + # Update the ui with the new time self.set_ui_text(time_string.format(str(int(time_left) + 1)), self.UI_TEXT) + # If the abort key was pressed while the loop was sleeping if self.pressed_key == "abort": + # Set the ui to confirm the abort self.set_ui_text(_("Authentication aborted"), self.UI_TEXT) self.set_ui_text("", self.UI_SUBTEXT) + # Exit time.sleep(1) return False + # If confirm has pressed, return that auth can continue elif self.pressed_key == "confirm": return True + # If no key has been pressed, wait for a bit and check again time.sleep(0.1) + # When our timeout hits, either abort or continue based on failsafe of faildeadly return not self.options["failsafe"] def on_key(self, type): + """Called when the user presses a key""" self.pressed_key = type diff --git a/src/rubberstamps/nod.py b/src/rubberstamps/nod.py index 21e48e8..ac7ac6f 100644 --- a/src/rubberstamps/nod.py +++ b/src/rubberstamps/nod.py @@ -1,69 +1,98 @@ import time from i18n import _ + +# Import the root rubberstamp class from rubberstamps import RubberStamp class nod(RubberStamp): def declare_config(self): + """Set the default values for the optional arguments""" self.options["min_distance"] = 6 self.options["min_directions"] = 2 def run(self): + """Track a users nose to see if they nod yes or no""" self.set_ui_text(_("Nod to confirm"), self.UI_TEXT) self.set_ui_text(_("Shake your head to abort"), self.UI_SUBTEXT) + # Stores relative distance between the 2 eyes in the last frame + # Used to calculate the distance of the nose traveled in relation to face size in the frame last_reldist = -1 + # Last point the nose was at last_nosepoint = {"x": -1, "y": -1} + # Contans booleans recording successful nods and their directions recorded_nods = {"x": [], "y": []} starttime = time.time() - while True: - if time.time() > starttime + self.options["timeout"]: - return not self.options["failsafe"] - + # Keep running the loop while we have not hit timeout yet + while time.time() < starttime + self.options["timeout"]: + # Read a frame from the camera ret, frame = self.video_capture.read_frame() + # Apply CLAHE to get a better picture frame = self.clahe.apply(frame) + # Detect all faces in the frame face_locations = self.face_detector(frame, 1) + # Only continue if exacty 1 face is visible in the frame if len(face_locations) != 1: continue + # Get the position of the eyes and tip of the nose face_landmarks = self.pose_predictor(frame, face_locations[0]) + # Calculate the relative distance between the 2 eyes reldist = face_landmarks.part(0).x - face_landmarks.part(2).x + # Avarage this out with the distance found in the last frame to smooth it out avg_reldist = (last_reldist + reldist) / 2 + # Calulate horizontal movement (shaking head) and vertical movement (nodding) for axis in ["x", "y"]: + # Get the location of the nose on the active axis nosepoint = getattr(face_landmarks.part(4), axis) + # If this is the first frame set the previous values to the current ones if last_nosepoint[axis] == -1: last_nosepoint[axis] = nosepoint last_reldist = reldist mindist = self.options["min_distance"] - movement = (nosepoint - last_nosepoint[axis]) * 100 / avg_reldist + # Get the relative movement by taking the distance traveled and deviding it by eye distance + movement = (nosepoint - last_nosepoint[axis]) * 100 / max(avg_reldist, 1) + # If the movement is over the minimal distance threshold if movement < -mindist or movement > mindist: + # If this is the first recorded nod, add it to the array if len(recorded_nods[axis]) == 0: recorded_nods[axis].append(movement < 0) + # Otherwise, only add this nod if the previous nod with in the other direction elif recorded_nods[axis][-1] != (movement < 0): recorded_nods[axis].append(movement < 0) + # Check if we have nodded enough on this axis if len(recorded_nods[axis]) >= self.options["min_directions"]: + # If nodded yes, show confirmation in ui if (axis == "y"): self.set_ui_text(_("Confirmed authentication"), self.UI_TEXT) + # If shaken no, show abort message else: self.set_ui_text(_("Aborted authentication"), self.UI_TEXT) + # Remove subtext self.set_ui_text("", self.UI_SUBTEXT) + # Return true for nodding yes and false for shaking no time.sleep(0.8) return axis == "y" + # Save the relative distance and the nosepoint for next loop last_reldist = reldist last_nosepoint[axis] = nosepoint + + # We've fallen out of the loop, so timeout has been hit + return not self.options["failsafe"]