mirror of
https://github.com/boltgolt/howdy.git
synced 2024-09-19 09:51:19 +02:00
Added comments
This commit is contained in:
parent
768f2f402d
commit
f463434309
4 changed files with 83 additions and 7 deletions
|
@ -303,7 +303,7 @@ while True:
|
||||||
lowest_certainty = match
|
lowest_certainty = match
|
||||||
|
|
||||||
# Check if a match that's confident enough
|
# Check if a match that's confident enough
|
||||||
if 0 < match < video_certainty or True:
|
if 0 < match < video_certainty:
|
||||||
timings["tt"] = time.time() - timings["st"]
|
timings["tt"] = time.time() - timings["st"]
|
||||||
timings["fl"] = time.time() - timings["fr"]
|
timings["fl"] = time.time() - timings["fr"]
|
||||||
|
|
||||||
|
|
|
@ -8,10 +8,13 @@ from importlib.machinery import SourceFileLoader
|
||||||
|
|
||||||
|
|
||||||
class RubberStamp:
|
class RubberStamp:
|
||||||
|
"""Howdy rubber stamp"""
|
||||||
|
|
||||||
UI_TEXT = "ui_text"
|
UI_TEXT = "ui_text"
|
||||||
UI_SUBTEXT = "ui_subtext"
|
UI_SUBTEXT = "ui_subtext"
|
||||||
|
|
||||||
def set_ui_text(self, text, type=None):
|
def set_ui_text(self, text, type=None):
|
||||||
|
"""Convert an ui string to input howdy-gtk understands"""
|
||||||
typedec = "M"
|
typedec = "M"
|
||||||
|
|
||||||
if type == self.UI_SUBTEXT:
|
if type == self.UI_SUBTEXT:
|
||||||
|
@ -20,12 +23,16 @@ class RubberStamp:
|
||||||
return self.send_ui_raw(typedec + "=" + text)
|
return self.send_ui_raw(typedec + "=" + text)
|
||||||
|
|
||||||
def send_ui_raw(self, command):
|
def send_ui_raw(self, command):
|
||||||
|
"""Write raw command to howdy-gtk stdin"""
|
||||||
if self.config.getboolean("debug", "verbose_stamps", fallback=False):
|
if self.config.getboolean("debug", "verbose_stamps", fallback=False):
|
||||||
print("Sending command to howdy-gtk: " + command)
|
print("Sending command to howdy-gtk: " + command)
|
||||||
|
|
||||||
|
# Add a newline because the ui reads per line
|
||||||
command += " \n"
|
command += " \n"
|
||||||
|
|
||||||
|
# If we're connected to the ui
|
||||||
if self.gtk_proc:
|
if self.gtk_proc:
|
||||||
|
# Send the command as bytes
|
||||||
self.gtk_proc.stdin.write(bytearray(command.encode("utf-8")))
|
self.gtk_proc.stdin.write(bytearray(command.encode("utf-8")))
|
||||||
self.gtk_proc.stdin.flush()
|
self.gtk_proc.stdin.flush()
|
||||||
|
|
||||||
|
@ -39,62 +46,77 @@ def execute(config, gtk_proc, opencv):
|
||||||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||||
installed_stamps = []
|
installed_stamps = []
|
||||||
|
|
||||||
|
# Go through each file in the rubberstamp folder
|
||||||
for filename in os.listdir(dir_path):
|
for filename in os.listdir(dir_path):
|
||||||
|
# Remove non-readable file or directories
|
||||||
if not os.path.isfile(dir_path + "/" + filename):
|
if not os.path.isfile(dir_path + "/" + filename):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Remove meta files
|
||||||
if filename in ["__init__.py", ".gitignore"]:
|
if filename in ["__init__.py", ".gitignore"]:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Add the found file to the list of enabled rubberstamps
|
||||||
installed_stamps.append(filename.split(".")[0])
|
installed_stamps.append(filename.split(".")[0])
|
||||||
|
|
||||||
if verbose: print("Installed rubberstamps: " + ", ".join(installed_stamps))
|
if verbose: print("Installed rubberstamps: " + ", ".join(installed_stamps))
|
||||||
|
|
||||||
|
# Get the rules defined in the config
|
||||||
raw_rules = config.get("rubberstamps", "stamp_rules")
|
raw_rules = config.get("rubberstamps", "stamp_rules")
|
||||||
rules = raw_rules.split("\n")
|
rules = raw_rules.split("\n")
|
||||||
|
|
||||||
|
# Go through the rules one by one
|
||||||
for rule in rules:
|
for rule in rules:
|
||||||
rule = rule.strip()
|
rule = rule.strip()
|
||||||
|
|
||||||
if len(rule) <= 1:
|
if len(rule) <= 1:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Parse the rule with regex
|
||||||
regex_result = re.search("^(\w+)\s+([\w\.]+)\s+([a-z]+)(.*)?$", rule, re.IGNORECASE)
|
regex_result = re.search("^(\w+)\s+([\w\.]+)\s+([a-z]+)(.*)?$", rule, re.IGNORECASE)
|
||||||
|
|
||||||
|
# Error out if the regex did not match (invalid line)
|
||||||
if not regex_result:
|
if not regex_result:
|
||||||
print(_("Error parsing rubberstamp rule: {}").format(rule))
|
print(_("Error parsing rubberstamp rule: {}").format(rule))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
type = regex_result.group(1)
|
type = regex_result.group(1)
|
||||||
|
|
||||||
|
# Error out if the stamp name in the rule is not a file
|
||||||
if type not in installed_stamps:
|
if type not in installed_stamps:
|
||||||
print(_("Stamp not installed: {}").format(type))
|
print(_("Stamp not installed: {}").format(type))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Load the module from file
|
||||||
module = SourceFileLoader(type, dir_path + "/" + type + ".py").load_module()
|
module = SourceFileLoader(type, dir_path + "/" + type + ".py").load_module()
|
||||||
|
|
||||||
|
# Try to get the class with the same name
|
||||||
try:
|
try:
|
||||||
constructor = getattr(module, type)
|
constructor = getattr(module, type)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
print(_("Stamp error: Class {} not found").format(type))
|
print(_("Stamp error: Class {} not found").format(type))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Init the class and set common values
|
||||||
instance = constructor()
|
instance = constructor()
|
||||||
instance.verbose = verbose
|
instance.verbose = verbose
|
||||||
instance.config = config
|
instance.config = config
|
||||||
instance.gtk_proc = gtk_proc
|
instance.gtk_proc = gtk_proc
|
||||||
instance.opencv = opencv
|
instance.opencv = opencv
|
||||||
|
|
||||||
|
# Set some opensv shorthands
|
||||||
instance.video_capture = opencv["video_capture"]
|
instance.video_capture = opencv["video_capture"]
|
||||||
instance.face_detector = opencv["face_detector"]
|
instance.face_detector = opencv["face_detector"]
|
||||||
instance.pose_predictor = opencv["pose_predictor"]
|
instance.pose_predictor = opencv["pose_predictor"]
|
||||||
instance.clahe = opencv["clahe"]
|
instance.clahe = opencv["clahe"]
|
||||||
|
|
||||||
|
# Parse and set the 2 required options for all rubberstamps
|
||||||
instance.options = {
|
instance.options = {
|
||||||
"timeout": float(re.sub("[a-zA-Z]", "", regex_result.group(2))),
|
"timeout": float(re.sub("[a-zA-Z]", "", regex_result.group(2))),
|
||||||
"failsafe": regex_result.group(3) != "faildeadly"
|
"failsafe": regex_result.group(3) != "faildeadly"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Try to get the class do declare its other config variables
|
||||||
try:
|
try:
|
||||||
instance.declare_config()
|
instance.declare_config()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -104,15 +126,20 @@ def execute(config, gtk_proc, opencv):
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Split the optional arguments at the end of the rule by spaces
|
||||||
raw_options = regex_result.group(4).split()
|
raw_options = regex_result.group(4).split()
|
||||||
|
|
||||||
|
# For each of those aoptional arguments
|
||||||
for option in raw_options:
|
for option in raw_options:
|
||||||
|
# Get the key to the left, and the value to the right of the equal sign
|
||||||
key, value = option.split("=")
|
key, value = option.split("=")
|
||||||
|
|
||||||
|
# Error out if a key has been set that was not declared by the module before
|
||||||
if key not in instance.options:
|
if key not in instance.options:
|
||||||
print("Unknow config option for rubberstamp " + type + ": " + key)
|
print("Unknow config option for rubberstamp " + type + ": " + key)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Convert the argument string to an int or float if the declared option has that type
|
||||||
if isinstance(instance.options[key], int):
|
if isinstance(instance.options[key], int):
|
||||||
value = int(value)
|
value = int(value)
|
||||||
elif isinstance(instance.options[key], float):
|
elif isinstance(instance.options[key], float):
|
||||||
|
@ -125,8 +152,10 @@ def execute(config, gtk_proc, opencv):
|
||||||
print(instance.options)
|
print(instance.options)
|
||||||
print("Executing stamp")
|
print("Executing stamp")
|
||||||
|
|
||||||
|
# Make the stamp fail by default
|
||||||
result = False
|
result = False
|
||||||
|
|
||||||
|
# Run the stamp code
|
||||||
try:
|
try:
|
||||||
result = instance.run()
|
result = instance.run()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -138,6 +167,7 @@ def execute(config, gtk_proc, opencv):
|
||||||
|
|
||||||
if verbose: print("Stamp \"" + type + "\" returned: " + str(result))
|
if verbose: print("Stamp \"" + type + "\" returned: " + str(result))
|
||||||
|
|
||||||
|
# Abort authentication if the stamp returned false
|
||||||
if result is False:
|
if result is False:
|
||||||
if verbose: print("Authentication aborted by rubber stamp")
|
if verbose: print("Authentication aborted by rubber stamp")
|
||||||
sys.exit(14)
|
sys.exit(14)
|
||||||
|
|
|
@ -2,6 +2,8 @@ import time
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from i18n import _
|
from i18n import _
|
||||||
|
|
||||||
|
# Import the root rubberstamp class
|
||||||
from rubberstamps import RubberStamp
|
from rubberstamps import RubberStamp
|
||||||
|
|
||||||
|
|
||||||
|
@ -9,16 +11,20 @@ class hotkey(RubberStamp):
|
||||||
pressed_key = "none"
|
pressed_key = "none"
|
||||||
|
|
||||||
def declare_config(self):
|
def declare_config(self):
|
||||||
|
"""Set the default values for the optional arguments"""
|
||||||
self.options["abort_key"] = "esc"
|
self.options["abort_key"] = "esc"
|
||||||
self.options["confirm_key"] = "enter"
|
self.options["confirm_key"] = "enter"
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
"""Wait for the user to press a hotkey"""
|
||||||
time_left = self.options["timeout"]
|
time_left = self.options["timeout"]
|
||||||
time_string = _("Aborting authorisation in {}") if self.options["failsafe"] else _("Authorising in {}")
|
time_string = _("Aborting authorisation in {}") if self.options["failsafe"] else _("Authorising in {}")
|
||||||
|
|
||||||
|
# Set the ui to default strings
|
||||||
self.set_ui_text(time_string.format(int(time_left)), self.UI_TEXT)
|
self.set_ui_text(time_string.format(int(time_left)), self.UI_TEXT)
|
||||||
self.set_ui_text(_("Hold {abort_key} to abort, hold {confirm_key} to authorise").format(abort_key=self.options["abort_key"], confirm_key=self.options["confirm_key"]), self.UI_SUBTEXT)
|
self.set_ui_text(_("Press {abort_key} to abort, {confirm_key} to authorise").format(abort_key=self.options["abort_key"], confirm_key=self.options["confirm_key"]), self.UI_SUBTEXT)
|
||||||
|
|
||||||
|
# Try to import the keyboard module and tell the user to install the module if that fails
|
||||||
try:
|
try:
|
||||||
import keyboard
|
import keyboard
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -27,26 +33,37 @@ class hotkey(RubberStamp):
|
||||||
print("\t pip3 install keyboard")
|
print("\t pip3 install keyboard")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Register hotkeys with the kernel
|
||||||
keyboard.add_hotkey(self.options["abort_key"], self.on_key, args=["abort"])
|
keyboard.add_hotkey(self.options["abort_key"], self.on_key, args=["abort"])
|
||||||
keyboard.add_hotkey(self.options["confirm_key"], self.on_key, args=["confirm"])
|
keyboard.add_hotkey(self.options["confirm_key"], self.on_key, args=["confirm"])
|
||||||
|
|
||||||
|
# While we have not hit our timeout yet
|
||||||
while time_left > 0:
|
while time_left > 0:
|
||||||
|
# Remove 0.1 seconds from the timer, as that's how long we sleep
|
||||||
time_left -= 0.1
|
time_left -= 0.1
|
||||||
|
# Update the ui with the new time
|
||||||
self.set_ui_text(time_string.format(str(int(time_left) + 1)), self.UI_TEXT)
|
self.set_ui_text(time_string.format(str(int(time_left) + 1)), self.UI_TEXT)
|
||||||
|
|
||||||
|
# If the abort key was pressed while the loop was sleeping
|
||||||
if self.pressed_key == "abort":
|
if self.pressed_key == "abort":
|
||||||
|
# Set the ui to confirm the abort
|
||||||
self.set_ui_text(_("Authentication aborted"), self.UI_TEXT)
|
self.set_ui_text(_("Authentication aborted"), self.UI_TEXT)
|
||||||
self.set_ui_text("", self.UI_SUBTEXT)
|
self.set_ui_text("", self.UI_SUBTEXT)
|
||||||
|
|
||||||
|
# Exit
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# If confirm has pressed, return that auth can continue
|
||||||
elif self.pressed_key == "confirm":
|
elif self.pressed_key == "confirm":
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
# If no key has been pressed, wait for a bit and check again
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# When our timeout hits, either abort or continue based on failsafe of faildeadly
|
||||||
return not self.options["failsafe"]
|
return not self.options["failsafe"]
|
||||||
|
|
||||||
def on_key(self, type):
|
def on_key(self, type):
|
||||||
|
"""Called when the user presses a key"""
|
||||||
self.pressed_key = type
|
self.pressed_key = type
|
||||||
|
|
|
@ -1,69 +1,98 @@
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from i18n import _
|
from i18n import _
|
||||||
|
|
||||||
|
# Import the root rubberstamp class
|
||||||
from rubberstamps import RubberStamp
|
from rubberstamps import RubberStamp
|
||||||
|
|
||||||
|
|
||||||
class nod(RubberStamp):
|
class nod(RubberStamp):
|
||||||
def declare_config(self):
|
def declare_config(self):
|
||||||
|
"""Set the default values for the optional arguments"""
|
||||||
self.options["min_distance"] = 6
|
self.options["min_distance"] = 6
|
||||||
self.options["min_directions"] = 2
|
self.options["min_directions"] = 2
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
"""Track a users nose to see if they nod yes or no"""
|
||||||
self.set_ui_text(_("Nod to confirm"), self.UI_TEXT)
|
self.set_ui_text(_("Nod to confirm"), self.UI_TEXT)
|
||||||
self.set_ui_text(_("Shake your head to abort"), self.UI_SUBTEXT)
|
self.set_ui_text(_("Shake your head to abort"), self.UI_SUBTEXT)
|
||||||
|
|
||||||
|
# Stores relative distance between the 2 eyes in the last frame
|
||||||
|
# Used to calculate the distance of the nose traveled in relation to face size in the frame
|
||||||
last_reldist = -1
|
last_reldist = -1
|
||||||
|
# Last point the nose was at
|
||||||
last_nosepoint = {"x": -1, "y": -1}
|
last_nosepoint = {"x": -1, "y": -1}
|
||||||
|
# Contans booleans recording successful nods and their directions
|
||||||
recorded_nods = {"x": [], "y": []}
|
recorded_nods = {"x": [], "y": []}
|
||||||
|
|
||||||
starttime = time.time()
|
starttime = time.time()
|
||||||
|
|
||||||
while True:
|
# Keep running the loop while we have not hit timeout yet
|
||||||
if time.time() > starttime + self.options["timeout"]:
|
while time.time() < starttime + self.options["timeout"]:
|
||||||
return not self.options["failsafe"]
|
# Read a frame from the camera
|
||||||
|
|
||||||
ret, frame = self.video_capture.read_frame()
|
ret, frame = self.video_capture.read_frame()
|
||||||
|
|
||||||
|
# Apply CLAHE to get a better picture
|
||||||
frame = self.clahe.apply(frame)
|
frame = self.clahe.apply(frame)
|
||||||
|
|
||||||
|
# Detect all faces in the frame
|
||||||
face_locations = self.face_detector(frame, 1)
|
face_locations = self.face_detector(frame, 1)
|
||||||
|
|
||||||
|
# Only continue if exacty 1 face is visible in the frame
|
||||||
if len(face_locations) != 1:
|
if len(face_locations) != 1:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Get the position of the eyes and tip of the nose
|
||||||
face_landmarks = self.pose_predictor(frame, face_locations[0])
|
face_landmarks = self.pose_predictor(frame, face_locations[0])
|
||||||
|
|
||||||
|
# Calculate the relative distance between the 2 eyes
|
||||||
reldist = face_landmarks.part(0).x - face_landmarks.part(2).x
|
reldist = face_landmarks.part(0).x - face_landmarks.part(2).x
|
||||||
|
# Avarage this out with the distance found in the last frame to smooth it out
|
||||||
avg_reldist = (last_reldist + reldist) / 2
|
avg_reldist = (last_reldist + reldist) / 2
|
||||||
|
|
||||||
|
# Calulate horizontal movement (shaking head) and vertical movement (nodding)
|
||||||
for axis in ["x", "y"]:
|
for axis in ["x", "y"]:
|
||||||
|
# Get the location of the nose on the active axis
|
||||||
nosepoint = getattr(face_landmarks.part(4), axis)
|
nosepoint = getattr(face_landmarks.part(4), axis)
|
||||||
|
|
||||||
|
# If this is the first frame set the previous values to the current ones
|
||||||
if last_nosepoint[axis] == -1:
|
if last_nosepoint[axis] == -1:
|
||||||
last_nosepoint[axis] = nosepoint
|
last_nosepoint[axis] = nosepoint
|
||||||
last_reldist = reldist
|
last_reldist = reldist
|
||||||
|
|
||||||
mindist = self.options["min_distance"]
|
mindist = self.options["min_distance"]
|
||||||
movement = (nosepoint - last_nosepoint[axis]) * 100 / avg_reldist
|
# Get the relative movement by taking the distance traveled and deviding it by eye distance
|
||||||
|
movement = (nosepoint - last_nosepoint[axis]) * 100 / max(avg_reldist, 1)
|
||||||
|
|
||||||
|
# If the movement is over the minimal distance threshold
|
||||||
if movement < -mindist or movement > mindist:
|
if movement < -mindist or movement > mindist:
|
||||||
|
# If this is the first recorded nod, add it to the array
|
||||||
if len(recorded_nods[axis]) == 0:
|
if len(recorded_nods[axis]) == 0:
|
||||||
recorded_nods[axis].append(movement < 0)
|
recorded_nods[axis].append(movement < 0)
|
||||||
|
|
||||||
|
# Otherwise, only add this nod if the previous nod with in the other direction
|
||||||
elif recorded_nods[axis][-1] != (movement < 0):
|
elif recorded_nods[axis][-1] != (movement < 0):
|
||||||
recorded_nods[axis].append(movement < 0)
|
recorded_nods[axis].append(movement < 0)
|
||||||
|
|
||||||
|
# Check if we have nodded enough on this axis
|
||||||
if len(recorded_nods[axis]) >= self.options["min_directions"]:
|
if len(recorded_nods[axis]) >= self.options["min_directions"]:
|
||||||
|
# If nodded yes, show confirmation in ui
|
||||||
if (axis == "y"):
|
if (axis == "y"):
|
||||||
self.set_ui_text(_("Confirmed authentication"), self.UI_TEXT)
|
self.set_ui_text(_("Confirmed authentication"), self.UI_TEXT)
|
||||||
|
# If shaken no, show abort message
|
||||||
else:
|
else:
|
||||||
self.set_ui_text(_("Aborted authentication"), self.UI_TEXT)
|
self.set_ui_text(_("Aborted authentication"), self.UI_TEXT)
|
||||||
|
|
||||||
|
# Remove subtext
|
||||||
self.set_ui_text("", self.UI_SUBTEXT)
|
self.set_ui_text("", self.UI_SUBTEXT)
|
||||||
|
|
||||||
|
# Return true for nodding yes and false for shaking no
|
||||||
time.sleep(0.8)
|
time.sleep(0.8)
|
||||||
return axis == "y"
|
return axis == "y"
|
||||||
|
|
||||||
|
# Save the relative distance and the nosepoint for next loop
|
||||||
last_reldist = reldist
|
last_reldist = reldist
|
||||||
last_nosepoint[axis] = nosepoint
|
last_nosepoint[axis] = nosepoint
|
||||||
|
|
||||||
|
# We've fallen out of the loop, so timeout has been hit
|
||||||
|
return not self.options["failsafe"]
|
||||||
|
|
Loading…
Reference in a new issue