mirror of
https://github.com/boltgolt/howdy.git
synced 2024-09-19 09:51:19 +02:00
Added rubberstamp config, autoload
This commit is contained in:
parent
671b33884f
commit
473aa9807e
5 changed files with 109 additions and 27 deletions
|
@ -282,6 +282,8 @@ while True:
|
|||
if capture_successful:
|
||||
make_snapshot("SUCCESSFUL")
|
||||
|
||||
# Run rubberstamps if enabled
|
||||
if config.getboolean("rubberstamps", "enabled", fallback=False):
|
||||
import rubberstamps
|
||||
rubberstamps.execute(config, {
|
||||
"video_capture": video_capture,
|
||||
|
|
|
@ -85,7 +85,20 @@ capture_failed = true
|
|||
# Do the same as the option above but for successful attempts
|
||||
capture_successful = true
|
||||
|
||||
[rubberstamps]
|
||||
# Enable specific extra checks after the user has been recognised
|
||||
enabled = false
|
||||
|
||||
# What type of stamps to run and with what options. The type, timeout and
|
||||
# failure mode are required. One line per stamp. Rule syntax:
|
||||
# stamptype timeout (failsafe | faildeadly) [extra_argument=value]
|
||||
stamp_rules =
|
||||
nod 5s failsafe min_distance=12
|
||||
|
||||
[debug]
|
||||
# Show a short but detailed diagnostic report in console
|
||||
# Enabling this can cause some UI apps to fail, only enable it to debug
|
||||
end_report = false
|
||||
|
||||
# More verbose logging from the rubberstamps system
|
||||
verbose_stamps = false
|
||||
|
|
|
@ -78,6 +78,13 @@ def doAuth(pamh):
|
|||
syslog.closelog()
|
||||
return pamh.PAM_AUTH_ERR
|
||||
|
||||
# Status 14 means a rubberstamp could not be given
|
||||
elif status == 14:
|
||||
pamh.conversation(pamh.Message(pamh.PAM_ERROR_MSG, "Rubberstamp denied"))
|
||||
syslog.syslog(syslog.LOG_INFO, "Failure, rubberstamp did not succeed")
|
||||
syslog.closelog()
|
||||
return pamh.PAM_AUTH_ERR
|
||||
|
||||
# Status 1 is probably a python crash
|
||||
elif status == 1:
|
||||
pamh.conversation(pamh.Message(pamh.PAM_ERROR_MSG, "Howdy encountered error, check stderr"))
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
import sys
|
||||
import os
|
||||
import re
|
||||
|
||||
from importlib.machinery import SourceFileLoader
|
||||
|
||||
|
@ -12,7 +14,9 @@ class RubberStamp:
|
|||
|
||||
|
||||
def execute(config, opencv):
|
||||
verbose = config.getboolean("debug", "verbose_stamps", fallback=False)
|
||||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||
installed_stamps = []
|
||||
|
||||
for filename in os.listdir(dir_path):
|
||||
if not os.path.isfile(dir_path + "/" + filename):
|
||||
|
@ -21,15 +25,78 @@ def execute(config, opencv):
|
|||
if filename in ["__init__.py", ".gitignore"]:
|
||||
continue
|
||||
|
||||
class_name = filename.split(".")[0]
|
||||
module = SourceFileLoader(class_name, dir_path + "/" + filename).load_module()
|
||||
constructor = getattr(module, class_name)
|
||||
installed_stamps.append(filename.split(".")[0])
|
||||
|
||||
if verbose:
|
||||
print("Installed rubberstamps: " + " ".join(installed_stamps))
|
||||
|
||||
raw_rules = config.get("rubberstamps", "stamp_rules")
|
||||
rules = raw_rules.split("\n")
|
||||
|
||||
for rule in rules:
|
||||
rule = rule.strip()
|
||||
|
||||
if len(rule) <= 1:
|
||||
continue
|
||||
|
||||
regex_result = re.search("^(\w+)\s+(\w+)\s+([a-z]+)(.*)?$", rule, re.IGNORECASE)
|
||||
|
||||
if not regex_result:
|
||||
print("Error parsing rubberstamp rule: " + rule)
|
||||
continue
|
||||
|
||||
type = regex_result.group(1)
|
||||
|
||||
if type not in installed_stamps:
|
||||
print("Stamp not installed: " + type)
|
||||
continue
|
||||
|
||||
module = SourceFileLoader(type, dir_path + "/" + type + ".py").load_module()
|
||||
constructor = getattr(module, type)
|
||||
|
||||
instance = constructor()
|
||||
instance.config = config
|
||||
instance.opencv = opencv
|
||||
print(regex_result.group(3))
|
||||
|
||||
instance.options = {
|
||||
"timeout": int(re.sub("[a-zA-Z]", "", regex_result.group(2))),
|
||||
"failsafe": regex_result.group(3) != "faildeadly"
|
||||
}
|
||||
|
||||
instance.declare_config()
|
||||
|
||||
raw_options = regex_result.group(4).split()
|
||||
|
||||
for option in raw_options:
|
||||
key, value = option.split("=")
|
||||
|
||||
if key not in instance.options:
|
||||
print("Unknow config option for rubberstamp " + type + ": " + key)
|
||||
continue
|
||||
|
||||
if isinstance(instance.options[key], int):
|
||||
value = int(value)
|
||||
|
||||
instance.options[key] = value
|
||||
|
||||
if verbose:
|
||||
print("Stamp \"" + type + "\" options parsed:")
|
||||
print(instance.options)
|
||||
print("Executing stamp")
|
||||
|
||||
instance.create_shorthands()
|
||||
result = instance.run()
|
||||
|
||||
print(result)
|
||||
if verbose:
|
||||
print("Stamp \"" + type + "\" returned: " + str(result))
|
||||
|
||||
if not result:
|
||||
sys.exit(14)
|
||||
|
||||
# This is outside the for loop, so we've run all the rules
|
||||
if verbose:
|
||||
print("All rubberstamps processed, authentication successful")
|
||||
|
||||
# Exit with no errors
|
||||
sys.exit(0)
|
||||
|
|
|
@ -1,15 +1,13 @@
|
|||
import cv2
|
||||
import time
|
||||
|
||||
from rubberstamps import RubberStamp
|
||||
|
||||
min_distance = 10
|
||||
min_directions = 3
|
||||
failsafe = True
|
||||
timeout = 5
|
||||
|
||||
|
||||
class nod(RubberStamp):
|
||||
def declare_config(self):
|
||||
self.options["min_distance"] = 10
|
||||
self.options["min_directions"] = 3
|
||||
|
||||
def run(self):
|
||||
last_reldist = -1
|
||||
last_nosepoint = {"x": -1, "y": -1}
|
||||
|
@ -18,8 +16,8 @@ class nod(RubberStamp):
|
|||
starttime = time.time()
|
||||
|
||||
while True:
|
||||
if time.time() > starttime + timeout:
|
||||
return not failsafe
|
||||
if time.time() > starttime + self.options["timeout"]:
|
||||
return not self.options["failsafe"]
|
||||
|
||||
ret, frame = self.video_capture.read_frame()
|
||||
|
||||
|
@ -42,23 +40,18 @@ class nod(RubberStamp):
|
|||
last_nosepoint[axis] = nosepoint
|
||||
last_reldist = reldist
|
||||
|
||||
mindist = self.options["min_distance"]
|
||||
movement = (nosepoint - last_nosepoint[axis]) * 100 / avg_reldist
|
||||
|
||||
if movement < -min_distance or movement > min_distance:
|
||||
if movement < -mindist or movement > mindist:
|
||||
if len(recorded_nods[axis]) == 0:
|
||||
recorded_nods[axis].append(movement < 0)
|
||||
|
||||
elif recorded_nods[axis][-1] != (movement < 0):
|
||||
recorded_nods[axis].append(movement < 0)
|
||||
|
||||
if len(recorded_nods[axis]) >= min_directions:
|
||||
if len(recorded_nods[axis]) >= self.options["min_directions"]:
|
||||
return axis == "y"
|
||||
|
||||
last_reldist = reldist
|
||||
last_nosepoint[axis] = nosepoint
|
||||
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
|
||||
|
||||
cv2.imshow("Howdy Test", frame)
|
||||
if cv2.waitKey(1) != -1:
|
||||
raise KeyboardInterrupt()
|
||||
|
|
Loading…
Reference in a new issue