Commit b847c15e authored by Martin Jeřábek's avatar Martin Jeřábek

testfw: hardening

parent a22352aa
...@@ -15,7 +15,7 @@ func_cov_dir = d.parent / "build/functional_coverage" ...@@ -15,7 +15,7 @@ func_cov_dir = d.parent / "build/functional_coverage"
def setup_logging() -> None: def setup_logging() -> None:
with Path(d / 'logging.yaml').open('rt', encoding='utf-8') as f: with Path(d / 'logging.yaml').open('rt', encoding='utf-8') as f:
cfg = yaml.load(f) cfg = yaml.safe_load(f)
logging.setLogRecordFactory(MyLogRecord) logging.setLogRecordFactory(MyLogRecord)
logging.config.dictConfig(cfg) logging.config.dictConfig(cfg)
global log global log
...@@ -106,7 +106,7 @@ def test(obj, config, strict, create_ghws, vunit_args): ...@@ -106,7 +106,7 @@ def test(obj, config, strict, create_ghws, vunit_args):
config_file = base / config config_file = base / config
out_basename = os.path.splitext(config)[0] out_basename = os.path.splitext(config)[0]
with config_file.open('rt', encoding='utf-8') as f: with config_file.open('rt', encoding='utf-8') as f:
config = yaml.load(f) config = yaml.safe_load(f)
build.mkdir(exist_ok=True) build.mkdir(exist_ok=True)
os.chdir(str(build)) os.chdir(str(build))
......
...@@ -30,7 +30,8 @@ class UnitTests(TestsBase): ...@@ -30,7 +30,8 @@ class UnitTests(TestsBase):
pprint([x.name for x in unit_tests]) pprint([x.name for x in unit_tests])
raise RuntimeError('Testbench {}_unit_test does not exist' raise RuntimeError('Testbench {}_unit_test does not exist'
+ ' (but specified in config).'.format(name)) + ' (but specified in config).'.format(name))
assert len(tb) == 1 elif len(tb) != 1:
raise RuntimeError('Multiple tests matching "{}"'.format(name))
tb = tb[0] tb = tb[0]
tb.set_generic('timeout', cfg['timeout']) tb.set_generic('timeout', cfg['timeout'])
tb.set_generic('iterations', cfg['iterations']) tb.set_generic('iterations', cfg['iterations'])
......
...@@ -4,19 +4,22 @@ import signal ...@@ -4,19 +4,22 @@ import signal
__all__ = ['run'] __all__ = ['run']
def get_children_pids(parent_pid): def get_children_pids(parent_pid):
cmd = subprocess.run("ps -o pid --ppid {} --noheaders".format(parent_pid), shell=True, stdout=subprocess.PIPE, check=False) cmd = ['ps', '-o', 'pid', '--ppid', parent_pid, '--noheaders']
out = cmd.stdout.decode('ascii') res = subprocess.run(cmd, stdout=subprocess.PIPE, check=False)
out = res.stdout.decode('ascii')
return [int(pid_str) for pid_str in out.split() if int(pid_str) != parent_pid] return [int(pid_str) for pid_str in out.split() if int(pid_str) != parent_pid]
def recursive_kill(pid, sig=signal.SIGTERM): def recursive_kill(pid, sig=signal.SIGTERM):
children = get_children_pids(pid) children = get_children_pids(pid)
for child in children: for child in children:
recursive_kill(child) recursive_kill(child)
try: try:
os.kill(child, sig) os.kill(child, sig)
except ProcessLookupError as e: except ProcessLookupError:
pass pass
# ghdl creates a new process group for itself and fails to kill its child when it receives a signal :( # ghdl creates a new process group for itself and fails to kill its child when it receives a signal :(
def sighandler(signo, frame): def sighandler(signo, frame):
signal.signal(signo, signal.SIG_DFL) signal.signal(signo, signal.SIG_DFL)
...@@ -24,6 +27,7 @@ def sighandler(signo, frame): ...@@ -24,6 +27,7 @@ def sighandler(signo, frame):
# restore the handler, because vunit swallows the resulting exception # restore the handler, because vunit swallows the resulting exception
#signal.signal(signo, sighandler) #signal.signal(signo, sighandler)
def run(ui): def run(ui):
signal.signal(signal.SIGTERM, sighandler) signal.signal(signal.SIGTERM, sighandler)
signal.signal(signal.SIGINT, sighandler) signal.signal(signal.SIGINT, sighandler)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment