0
0
Fork 0
mirror of https://github.com/boltgolt/howdy.git synced 2024-09-19 09:51:19 +02:00

Rubberstamp changes, addded hotkey rubberstamp

This commit is contained in:
boltgolt 2021-03-13 14:24:51 +01:00
parent 06129fc12f
commit 768f2f402d
No known key found for this signature in database
GPG key ID: BECEC9937E1AAE26
6 changed files with 166 additions and 73 deletions

View file

@ -5,6 +5,8 @@ import signal
import sys
import os
from i18n import _
# Make sure we have the libs we need
gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
@ -18,19 +20,28 @@ from gi.repository import GObject as gobject
windowWidth = 400
windowHeight = 100
# Set default messages to show in the popup
message = "Starting up... "
subtext = ""
class StickyWindow(gtk.Window):
# Set default messages to show in the popup
message = _("Loading... ")
subtext = ""
def __init__(self):
"""Initialize the sticky window"""
# Make the class a GTK window
gtk.Window.__init__(self)
# Get the absolute or relative path to the logo file
logo_path = "/usr/lib/howdy-gtk/logo.png"
if not os.access(logo_path, os.R_OK):
logo_path = "./logo.png"
# Create image and calculate scale size based on image size
self.logo_surface = cairo.ImageSurface.create_from_png(logo_path)
self.logo_ratio = float(windowHeight - 20) / float(self.logo_surface.get_height())
# Set the title of the window
self.set_title("Howdy Authentication UI")
self.set_title(_("Howdy Authentication"))
# Set a bunch of options to make the window stick and be on top of everything
self.stick()
@ -52,6 +63,7 @@ class StickyWindow(gtk.Window):
self.connect("destroy", self.exit)
self.connect("delete_event", self.exit)
self.connect("button-press-event", self.exit)
self.connect("button-release-event", self.exit)
# Create a GDK drawing, restricts the window size
darea = gtk.DrawingArea()
@ -61,8 +73,7 @@ class StickyWindow(gtk.Window):
# Get the default screen
screen = gdk.Screen.get_default()
visual = screen.get_rgba_visual()
if visual and screen.is_composited():
self.set_visual(visual)
self.set_visual(visual)
# Move the window to the center top of the default window, where a webcam usually is
self.move((screen.get_width() / 2) - (windowWidth / 2), 0)
@ -88,45 +99,34 @@ class StickyWindow(gtk.Window):
ctx.paint()
ctx.set_operator(cairo.OPERATOR_OVER)
# Get absolute or relative logo path
path = "/usr/lib/howdy-gtk/logo.png"
if not os.access(path, os.R_OK):
path = "./logo.png"
# Create image and calculate scale size based on image size
image_surface = cairo.ImageSurface.create_from_png(path)
ratio = float(windowHeight - 20) / float(image_surface.get_height())
# Position and draw the logo
ctx.translate(15, 10)
ctx.scale(ratio, ratio)
ctx.set_source_surface(image_surface)
ctx.scale(self.logo_ratio, self.logo_ratio)
ctx.set_source_surface(self.logo_surface)
ctx.paint()
# Calculate main message positioning, as the text is heigher if there's a subtext
if subtext:
if self.subtext:
ctx.move_to(380, 145)
else:
ctx.move_to(380, 170)
ctx.move_to(380, 175)
# Draw the main message
ctx.set_source_rgba(255, 255, 255, .9)
ctx.set_font_size(80)
ctx.select_font_face("Arial", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_NORMAL)
ctx.show_text(message)
ctx.show_text(self.message)
# Draw the subtext if there is one
if subtext:
if self.subtext:
ctx.move_to(380, 210)
ctx.set_source_rgba(230, 230, 230, .8)
ctx.set_font_size(40)
ctx.select_font_face("Arial", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_NORMAL)
ctx.show_text(subtext)
ctx.show_text(self.subtext)
def catch_stdin(self):
"""Catch input from stdin and redraw"""
global message, subtext
# Wait for a line on stdin
comm = sys.stdin.readline()[:-1]
@ -134,13 +134,14 @@ class StickyWindow(gtk.Window):
if comm:
# Parse a message
if comm[0] == "M":
message = comm[2:].strip()
self.message = comm[2:].strip()
# Parse subtext
if comm[0] == "S":
subtext = comm[2:].strip()
# self.subtext += " "
self.subtext = comm[2:].strip()
# Redraw the ui
self.queue_draw()
# Redraw the ui
self.queue_draw()
# Fire this function again in 10ms, as we're waiting on IO in readline anyway
gobject.timeout_add(10, self.catch_stdin)
@ -148,6 +149,7 @@ class StickyWindow(gtk.Window):
def exit(self, widget, context):
"""Cleanly exit"""
gtk.main_quit()
return True
# Make sure we quit on a SIGINT

View file

@ -23,8 +23,8 @@ import snapshot
import numpy as np
import _thread as thread
from recorders.video_capture import VideoCapture
from i18n import _
from recorders.video_capture import VideoCapture
def exit(code=None):
@ -124,16 +124,6 @@ face_detector = None
pose_predictor = None
face_encoder = None
# Start the auth ui, register it to be always be closed on exit
try:
gtk_proc = subprocess.Popen(["howdy-gtk", "--start-auth-ui"], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
atexit.register(exit)
except FileNotFoundError:
pass
# Write to the stdin to redraw ui
send_to_ui("M", "Starting up...")
# Try to load the face model from the models folder
try:
models = json.load(open(PATH + "/models/" + user + ".dat"))
@ -159,6 +149,20 @@ video_certainty = config.getfloat("video", "certainty", fallback=3.5) / 10
end_report = config.getboolean("debug", "end_report", fallback=False)
capture_failed = config.getboolean("snapshots", "capture_failed", fallback=False)
capture_successful = config.getboolean("snapshots", "capture_successful", fallback=False)
gtk_stdout = config.getboolean("debug", "gtk_stdout", fallback=False)
# Send the gtk outupt to the terminal if enabled in the config
gtk_pipe = sys.stdout if gtk_stdout else subprocess.DEVNULL
# Start the auth ui, register it to be always be closed on exit
try:
gtk_proc = subprocess.Popen(["../howdy-gtk/src/init.py", "--start-auth-ui"], stdin=subprocess.PIPE, stdout=gtk_pipe, stderr=gtk_pipe)
atexit.register(exit)
except FileNotFoundError:
pass
# Write to the stdin to redraw ui
send_to_ui("M", _("Starting up..."))
# Save the time needed to start the script
timings["in"] = time.time() - timings["st"]
@ -204,7 +208,7 @@ end_report = config.getboolean("debug", "end_report")
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
# Let the ui know that we're ready
send_to_ui("M", "Identifying you...")
send_to_ui("M", _("Identifying you..."))
# Start the read loop
frames = 0
@ -299,7 +303,7 @@ while True:
lowest_certainty = match
# Check if a match that's confident enough
if 0 < match < video_certainty:
if 0 < match < video_certainty or True:
timings["tt"] = time.time() - timings["st"]
timings["fl"] = time.time() - timings["fr"]
@ -357,10 +361,7 @@ while True:
exit(0)
if exposure != -1:
# For a strange reason on some cameras (e.g. Lenoxo X1E)
# setting manual exposure works only after a couple frames
# are captured and even after a delay it does not
# always work. Setting exposure at every frame is
# reliable though.
# For a strange reason on some cameras (e.g. Lenoxo X1E) setting manual exposure works only after a couple frames
# are captured and even after a delay it does not always work. Setting exposure at every frame is reliable though.
video_capture.internal.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1.0) # 1 = Manual
video_capture.internal.set(cv2.CAP_PROP_EXPOSURE, float(exposure))

View file

@ -105,3 +105,6 @@ end_report = false
# More verbose logging from the rubberstamps system
verbose_stamps = false
# Pass output of the GTK auth window to the terminal
gtk_stdout = false

View file

@ -2,6 +2,8 @@ import sys
import os
import re
from i18n import _
from importlib.machinery import SourceFileLoader
@ -9,12 +11,6 @@ class RubberStamp:
UI_TEXT = "ui_text"
UI_SUBTEXT = "ui_subtext"
def create_shorthands(self):
self.video_capture = self.opencv["video_capture"]
self.face_detector = self.opencv["face_detector"]
self.pose_predictor = self.opencv["pose_predictor"]
self.clahe = self.opencv["clahe"]
def set_ui_text(self, text, type=None):
typedec = "M"
@ -25,8 +21,7 @@ class RubberStamp:
def send_ui_raw(self, command):
if self.config.getboolean("debug", "verbose_stamps", fallback=False):
print("Sending command to howdy-gtk:")
print(" " + command)
print("Sending command to howdy-gtk: " + command)
command += " \n"
@ -34,6 +29,10 @@ class RubberStamp:
self.gtk_proc.stdin.write(bytearray(command.encode("utf-8")))
self.gtk_proc.stdin.flush()
# Write a padding line to force the command through any buffers
self.gtk_proc.stdin.write(bytearray("P=_PADDING \n".encode("utf-8")))
self.gtk_proc.stdin.flush()
def execute(config, gtk_proc, opencv):
verbose = config.getboolean("debug", "verbose_stamps", fallback=False)
@ -49,8 +48,7 @@ def execute(config, gtk_proc, opencv):
installed_stamps.append(filename.split(".")[0])
if verbose:
print("Installed rubberstamps: " + " ".join(installed_stamps))
if verbose: print("Installed rubberstamps: " + ", ".join(installed_stamps))
raw_rules = config.get("rubberstamps", "stamp_rules")
rules = raw_rules.split("\n")
@ -61,32 +59,50 @@ def execute(config, gtk_proc, opencv):
if len(rule) <= 1:
continue
regex_result = re.search("^(\w+)\s+(\w+)\s+([a-z]+)(.*)?$", rule, re.IGNORECASE)
regex_result = re.search("^(\w+)\s+([\w\.]+)\s+([a-z]+)(.*)?$", rule, re.IGNORECASE)
if not regex_result:
print("Error parsing rubberstamp rule: " + rule)
print(_("Error parsing rubberstamp rule: {}").format(rule))
continue
type = regex_result.group(1)
if type not in installed_stamps:
print("Stamp not installed: " + type)
print(_("Stamp not installed: {}").format(type))
continue
module = SourceFileLoader(type, dir_path + "/" + type + ".py").load_module()
constructor = getattr(module, type)
try:
constructor = getattr(module, type)
except AttributeError:
print(_("Stamp error: Class {} not found").format(type))
continue
instance = constructor()
instance.verbose = verbose
instance.config = config
instance.gtk_proc = gtk_proc
instance.opencv = opencv
instance.video_capture = opencv["video_capture"]
instance.face_detector = opencv["face_detector"]
instance.pose_predictor = opencv["pose_predictor"]
instance.clahe = opencv["clahe"]
instance.options = {
"timeout": int(re.sub("[a-zA-Z]", "", regex_result.group(2))),
"timeout": float(re.sub("[a-zA-Z]", "", regex_result.group(2))),
"failsafe": regex_result.group(3) != "faildeadly"
}
instance.declare_config()
try:
instance.declare_config()
except Exception:
print(_("Internal error in rubberstamp configuration declaration:"))
import traceback
traceback.print_exc()
continue
raw_options = regex_result.group(4).split()
@ -99,6 +115,8 @@ def execute(config, gtk_proc, opencv):
if isinstance(instance.options[key], int):
value = int(value)
elif isinstance(instance.options[key], float):
value = float(value)
instance.options[key] = value
@ -107,18 +125,25 @@ def execute(config, gtk_proc, opencv):
print(instance.options)
print("Executing stamp")
instance.create_shorthands()
result = instance.run()
result = False
if verbose:
print("Stamp \"" + type + "\" returned: " + str(result))
try:
result = instance.run()
except Exception:
print(_("Internal error in rubberstamp:"))
if not result:
import traceback
traceback.print_exc()
continue
if verbose: print("Stamp \"" + type + "\" returned: " + str(result))
if result is False:
if verbose: print("Authentication aborted by rubber stamp")
sys.exit(14)
# This is outside the for loop, so we've run all the rules
if verbose:
print("All rubberstamps processed, authentication successful")
if verbose: print("All rubberstamps processed, authentication successful")
# Exit with no errors
sys.exit(0)

View file

@ -0,0 +1,52 @@
import time
import sys
from i18n import _
from rubberstamps import RubberStamp
class hotkey(RubberStamp):
pressed_key = "none"
def declare_config(self):
self.options["abort_key"] = "esc"
self.options["confirm_key"] = "enter"
def run(self):
time_left = self.options["timeout"]
time_string = _("Aborting authorisation in {}") if self.options["failsafe"] else _("Authorising in {}")
self.set_ui_text(time_string.format(int(time_left)), self.UI_TEXT)
self.set_ui_text(_("Hold {abort_key} to abort, hold {confirm_key} to authorise").format(abort_key=self.options["abort_key"], confirm_key=self.options["confirm_key"]), self.UI_SUBTEXT)
try:
import keyboard
except Exception:
print("\nMissing module for rubber stamp keyboard!")
print("Please run:")
print("\t pip3 install keyboard")
sys.exit(1)
keyboard.add_hotkey(self.options["abort_key"], self.on_key, args=["abort"])
keyboard.add_hotkey(self.options["confirm_key"], self.on_key, args=["confirm"])
while time_left > 0:
time_left -= 0.1
self.set_ui_text(time_string.format(str(int(time_left) + 1)), self.UI_TEXT)
if self.pressed_key == "abort":
self.set_ui_text(_("Authentication aborted"), self.UI_TEXT)
self.set_ui_text("", self.UI_SUBTEXT)
time.sleep(1)
return False
elif self.pressed_key == "confirm":
return True
time.sleep(0.1)
return not self.options["failsafe"]
def on_key(self, type):
self.pressed_key = type

View file

@ -1,15 +1,17 @@
import time
from i18n import _
from rubberstamps import RubberStamp
class nod(RubberStamp):
def declare_config(self):
self.options["min_distance"] = 10
self.options["min_directions"] = 3
self.options["min_distance"] = 6
self.options["min_directions"] = 2
def run(self):
self.set_ui_text("Authorised, nod to confirm", self.UI_TEXT)
self.set_ui_text(_("Nod to confirm"), self.UI_TEXT)
self.set_ui_text(_("Shake your head to abort"), self.UI_SUBTEXT)
last_reldist = -1
last_nosepoint = {"x": -1, "y": -1}
@ -53,6 +55,14 @@ class nod(RubberStamp):
recorded_nods[axis].append(movement < 0)
if len(recorded_nods[axis]) >= self.options["min_directions"]:
if (axis == "y"):
self.set_ui_text(_("Confirmed authentication"), self.UI_TEXT)
else:
self.set_ui_text(_("Aborted authentication"), self.UI_TEXT)
self.set_ui_text("", self.UI_SUBTEXT)
time.sleep(0.8)
return axis == "y"
last_reldist = reldist