163 lines
4.8 KiB
Python
163 lines
4.8 KiB
Python
|
#!/usr/bin/python3
|
||
|
|
||
|
BUILDDIR='@BUILDDIR@'
|
||
|
SRCDIR='@SRCDIR@'
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import signal
|
||
|
library_path = BUILDDIR + '/libfprint/'
|
||
|
|
||
|
# Relaunch ourselves with a changed environment so
|
||
|
# that we're loading the development version of libfprint
|
||
|
if 'LD_LIBRARY_PATH' not in os.environ or not library_path in os.environ['LD_LIBRARY_PATH']:
|
||
|
os.environ['LD_LIBRARY_PATH'] = library_path
|
||
|
os.environ['GI_TYPELIB_PATH'] = f'{BUILDDIR}/libfprint/'
|
||
|
os.environ['FP_DEVICE_EMULATION'] = '1'
|
||
|
try:
|
||
|
os.execv(sys.argv[0], sys.argv)
|
||
|
except Exception as e:
|
||
|
print('Could not run script with new library path')
|
||
|
sys.exit(1)
|
||
|
|
||
|
import gi
|
||
|
gi.require_version('FPrint', '2.0')
|
||
|
from gi.repository import FPrint
|
||
|
|
||
|
gi.require_version('GUsb', '1.0')
|
||
|
from gi.repository import GUsb
|
||
|
|
||
|
import re
|
||
|
import shutil
|
||
|
import subprocess
|
||
|
import tempfile
|
||
|
import time
|
||
|
|
||
|
def print_usage():
|
||
|
print(f'Usage: {sys.argv[0]} driver [test-variant-name]')
|
||
|
print('A test variant name is optional, and must be all lower case letters, or dashes, with no spaces')
|
||
|
print(f'The captured data will be stored in {SRCDIR}/tests/[driver name]-[test variant name]')
|
||
|
|
||
|
if len(sys.argv) > 3:
|
||
|
print_usage()
|
||
|
sys.exit(1)
|
||
|
|
||
|
driver_name = sys.argv[1]
|
||
|
os.environ['FP_DRIVERS_WHITELIST'] = driver_name
|
||
|
|
||
|
test_variant = None
|
||
|
if len(sys.argv) == 3:
|
||
|
valid_re = re.compile('[a-z-]*')
|
||
|
test_variant = sys.argv[2]
|
||
|
if (not valid_re.match(test_variant) or
|
||
|
test_variant.startswith('-') or
|
||
|
test_variant.endswith('-')):
|
||
|
print(f'Invalid variant name {test_variant}\n')
|
||
|
print_usage()
|
||
|
sys.exit(1)
|
||
|
|
||
|
# Check that running as root
|
||
|
|
||
|
if os.geteuid() != 0:
|
||
|
print(f'{sys.argv[0]} is expected to be run as root')
|
||
|
sys.exit(1)
|
||
|
|
||
|
# Check that tshark is available
|
||
|
|
||
|
tshark = shutil.which('tshark')
|
||
|
if not tshark:
|
||
|
print("The 'tshark' WireShark command-line tool must be installed to capture USB traffic")
|
||
|
sys.exit(1)
|
||
|
|
||
|
# Find the fingerprint reader
|
||
|
ctx = FPrint.Context()
|
||
|
ctx.enumerate()
|
||
|
devices = ctx.get_devices()
|
||
|
if len(devices) == 0:
|
||
|
print('Could not find a supported fingerprint reader')
|
||
|
sys.exit(1)
|
||
|
elif len(devices) > 1:
|
||
|
print('Capture requires a single supported fingerprint reader to be plugged in')
|
||
|
sys.exit(1)
|
||
|
|
||
|
test_name = driver_name
|
||
|
if test_variant:
|
||
|
test_name = driver_name + '-' + test_variant
|
||
|
usb_device = devices[0].get_property('fpi-usb-device')
|
||
|
bus_num = usb_device.get_bus()
|
||
|
device_num = usb_device.get_address()
|
||
|
|
||
|
print(f'### Detected USB device /dev/bus/usb/{bus_num:03d}/{device_num:03d}')
|
||
|
|
||
|
# Make directory
|
||
|
|
||
|
test_dir = SRCDIR + '/tests/' + test_name
|
||
|
os.makedirs(test_dir, mode=0o775, exist_ok=True)
|
||
|
|
||
|
# Capture device info
|
||
|
|
||
|
args = ['umockdev-record', f'/dev/bus/usb/{bus_num:03d}/{device_num:03d}']
|
||
|
device_out = open(test_dir + '/device', 'w')
|
||
|
process = subprocess.Popen(args, stdout=device_out)
|
||
|
process.wait()
|
||
|
|
||
|
# Run capture
|
||
|
# https://osqa-ask.wireshark.org/questions/53919/how-can-i-precisely-specify-a-usb-device-to-capture-with-tshark/
|
||
|
|
||
|
print(f'### Starting USB capture on usbmon{bus_num}')
|
||
|
capture_pid = os.fork()
|
||
|
assert(capture_pid >= 0)
|
||
|
|
||
|
unfiltered_cap_path = os.path.join(tempfile.gettempdir(), 'capture-unfiltered.pcapng')
|
||
|
if capture_pid == 0:
|
||
|
os.setpgrp()
|
||
|
args = ['tshark', '-q', '-i', f'usbmon{bus_num}', '-w', unfiltered_cap_path]
|
||
|
os.execv(tshark, args)
|
||
|
|
||
|
# Wait 1 sec to settle (we can assume setpgrp happened)
|
||
|
time.sleep(1)
|
||
|
|
||
|
print('### Capturing fingerprint, please swipe or press your finger on the reader')
|
||
|
with subprocess.Popen(['python3', SRCDIR + '/tests/capture.py', test_dir + '/capture.png']) as capture_process:
|
||
|
capture_process.wait()
|
||
|
if capture_process.returncode != 0:
|
||
|
print('Failed to capture fingerprint')
|
||
|
os.killpg(capture_pid, signal.SIGKILL)
|
||
|
sys.exit(1)
|
||
|
|
||
|
def t_waitpid(pid, timeout):
|
||
|
timeout = time.time() + timeout
|
||
|
r = os.waitpid(pid, os.WNOHANG)
|
||
|
while timeout > time.time() and r[0] == 0:
|
||
|
time.sleep(0.1)
|
||
|
r = os.waitpid(pid, os.WNOHANG)
|
||
|
|
||
|
return r
|
||
|
|
||
|
os.kill(capture_pid, signal.SIGTERM)
|
||
|
try:
|
||
|
r = t_waitpid(capture_pid, 2)
|
||
|
# Kill if nothing died
|
||
|
if r[0] == 0:
|
||
|
os.kill(capture_pid, signal.SIGKILL)
|
||
|
except ChildProcessError:
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
while True:
|
||
|
r = t_waitpid(-capture_pid, timeout=2)
|
||
|
# Kill the process group, if nothing died (and there are children)
|
||
|
if r[0] == 0:
|
||
|
os.killpg(capture_pid, signal.SIGKILL)
|
||
|
except ChildProcessError:
|
||
|
pass
|
||
|
|
||
|
# Filter the capture
|
||
|
print(f'\n### Saving USB capture as test case {test_name}')
|
||
|
args = ['tshark', '-r', unfiltered_cap_path, '-Y', f'usb.bus_id == {bus_num} and usb.device_address == {device_num}',
|
||
|
'-w', test_dir + '/capture.pcapng']
|
||
|
with subprocess.Popen(args, stderr=subprocess.DEVNULL) as filter_process:
|
||
|
filter_process.wait()
|
||
|
|
||
|
print(f"\nDone! Don't forget to add {test_name} to tests/meson.build")
|