Commit af573b82 authored by Martin Jeřábek's avatar Martin Jeřábek

testfw: split into submodules

parent c934c7a6
import yaml
from os.path import join, dirname, abspath
import logging
import logging.config
from pathlib import Path
import click
from glob import glob
import os
import re
import sys
from textwrap import dedent
from pprint import pprint
from collections.abc import Iterable
from pathlib import Path
from .log import MyLogRecord
from . import vunit_ifc
from . import test_unit, test_sanity, test_feature
from vunit.ui import VUnit
import re
from os.path import abspath
from .test_common import add_common_sources, add_flags
d = Path(abspath(__file__)).parent
base = d.parent
build = base / 'build'
def setup_logging() -> None:
with Path(d / 'logging.yaml').open('rt', encoding='utf-8') as f:
......@@ -50,103 +46,6 @@ class AliasedGroup(click.Group):
ctx.fail('Too many matches: %s' % ', '.join(sorted(matches)))
#-------------------------------------------------------------------------------
def add_sources(lib, patterns):
for pattern in patterns:
p = join(str(d.parent), pattern)
log.debug('Adding sources matching {}'.format(p))
for f in glob(p, recursive=True):
if f != "tb_wrappers.vhd":
lib.add_source_file(str(f))
def add_common_sources(lib):
return add_sources(lib, ['../src/**/*.vhd', '*.vhd', 'lib/*.vhd'])
def get_common_modelsim_init_files():
modelsim_init_files = '../lib/test_lib.tcl,modelsim_init.tcl'
modelsim_init_files = [str(d/x) for x in modelsim_init_files.split(',')]
return modelsim_init_files
def add_flags(ui, lib, build):
unit_tests = lib.get_test_benches('*_unit_test', allow_empty=True)
for ut in unit_tests:
ut.scan_tests_from_file(str(build / "../unit/vunittb_wrapper.vhd"))
#lib.add_compile_option("ghdl.flags", ["-Wc,-g"])
lib.add_compile_option("ghdl.flags", ["-fprofile-arcs", "-ftest-coverage"])
ui.set_sim_option("ghdl.elab_flags", ["-Wl,-lgcov", "-Wl,--coverage", "-Wl,-no-pie"])
ui.set_sim_option("ghdl.sim_flags", ["--ieee-asserts=disable-at-0"])
modelsim_init_files = get_common_modelsim_init_files()
ui.set_sim_option("modelsim.init_files.after_load", modelsim_init_files)
def create_wrapper(lib, fname):
fname = str(fname)
files = lib.get_source_files()
tests = []
r = re.compile(r'^architecture\s+(\S+)\s+of\s+CAN_test\s+is$')
for file in files:
with open(file.name, 'rt', encoding='utf-8') as f:
for l in f:
m = r.match(l)
if m:
tests.append(m.group(1))
configs = []
tbs = []
for test in tests:
configs.append(dedent("""\
configuration tbconf_{test} of vunittb_wrapper is
for tb
for i_test : CAN_test use entity work.CAN_test({test}); end for;
end for;
end configuration;
-- -----------------------------------------------------------------------------
""".format(test=test)
))
tbs.append(dedent("""\
library work;
use work.CANtestLib.All;
library vunit_lib;
context vunit_lib.vunit_context;
entity tb_{test} is generic (
runner_cfg : string := runner_cfg_default;
iterations : natural := 1;
log_level : log_lvl_type := info_l;
error_beh : err_beh_type := quit;
error_tol : natural := 0;
timeout : string := "0 ms"
); end entity;
architecture tb of tb_{test} is
component vunittb_wrapper is generic (
nested_runner_cfg : string;
iterations : natural;
log_level : log_lvl_type;
error_beh : err_beh_type;
error_tol : natural;
timeout : string
); end component;
for all:vunittb_wrapper use configuration work.tbconf_{test};
begin
tb:vunittb_wrapper generic map(
nested_runner_cfg => runner_cfg,
iterations => iterations,
log_level => log_level,
error_beh => error_beh,
error_tol => error_tol,
timeout => timeout);
end architecture;
-- -----------------------------------------------------------------------------
""".format(test=test)
))
with open(fname, "wt", encoding='utf-8') as f:
for c in configs:
f.write(c)
for t in tbs:
f.write(t)
lib.add_source_file(fname)
@click.group(cls=AliasedGroup)
@click.option('--compile', is_flag=True) #, hidden=True
@click.pass_context
......@@ -160,39 +59,14 @@ def cli(ctx, compile):
def create():
pass
def create_vunit(obj, vunit_args):
# fill vunit arguments
args = []
# hack for vunit_compile TCL command
if obj['compile']:
args += ['--compile']
args += ['--xunit-xml', '../test_unit.xml1'] + list(vunit_args)
ui = VUnit.from_argv(args)
return ui
def vunit_run(ui, build):
try:
vunit_ifc.run(ui)
res = None
except SystemExit as e:
res = e.code
out = build / '../test_unit.xml1'
if out.exists():
with out.open('rt', encoding='utf-8') as f:
c = f.read()
with open('../test_unit.xml', 'wt', encoding='utf-8') as f:
print('<?xml version="1.0" encoding="utf-8"?>', file=f)
print('<?xml-stylesheet href="xunit.xsl" type="text/xsl"?>', file=f)
f.write(c)
out.unlink()
sys.exit(res)
@cli.command()
@click.argument('config', type=click.Path())
@click.argument('vunit_args', nargs=-1)
@click.pass_obj
def test(obj, config, vunit_args):
base = d.parent
build = base / 'build'
config_file = base / config
with config_file.open('rt', encoding='utf-8') as f:
config = yaml.load(f)
......@@ -208,36 +82,19 @@ def test(obj, config, vunit_args):
lib = ui.add_library("lib")
add_common_sources(lib)
# unit tests
tests = []
if run_unit:
add_sources(lib, ['unit/**/*.vhd'])
create_wrapper(lib, build / "tb_wrappers.vhd")
# sanity test
tests.append(test_unit.UnitTests(ui, lib, config['unit'], build, base))
if run_sanity:
add_sources(lib, ['sanity/*.vhd'])
# feature tests
tests.append(test_sanity.SanityTests(ui, lib, config['sanity'], build, base))
if run_feature:
add_sources(lib, ['feature/*.vhd'])
tests.append(test_feature.FeatureTests(ui, lib, config['feature'], build, base))
for t in tests:
t.add_sources()
add_flags(ui, lib, build)
if run_unit:
configure_unit_tests(ui, lib, config['unit'])
if run_sanity:
configure_sanity_tests(ui, lib, config['sanity'])
if run_feature:
configure_feature_tests(ui, lib, config['feature'])
# check for unconfigured unit tests
if run_unit:
unit_tests = lib.get_test_benches('*tb_*_unit_test')
configured = ['tb_{}_unit_test'.format(name) for name in config['unit']['tests'].keys()]
log.debug('Configured unit tests: {}'.format(', '.join(configured)))
unconfigured = [tb for tb in unit_tests if tb.name not in configured]
if len(unconfigured):
log.warn("Unit tests with no configuration found (defaults will be used): {}".format(', '.join(tb.name for tb in unconfigured)))
for t in tests:
t.configure()
# check for unknown tests
all_benches = lib.get_test_benches('*')
......@@ -247,136 +104,32 @@ def test(obj, config, vunit_args):
vunit_run(ui, build)
def create_vunit(obj, vunit_args):
# fill vunit arguments
args = []
# hack for vunit_compile TCL command
if obj['compile']:
args += ['--compile']
args += ['--xunit-xml', '../test_unit.xml1'] + list(vunit_args)
ui = VUnit.from_argv(args)
return ui
def configure_unit_tests(ui, lib, config):
default = config['default']
unit_tests = lib.get_test_benches('*_unit_test')
for name, _cfg in config['tests'].items():
cfg = default.copy()
if _cfg:
cfg.update(_cfg)
tb = lib.get_test_benches('*tb_{}_unit_test'.format(name), allow_empty=True)
if not len(tb):
pprint([x.name for x in unit_tests])
raise RuntimeError('Testbench {}_unit_test does not exist (but specified in config).'.format(name))
assert len(tb) == 1
tb = tb[0]
tb.set_generic('timeout', cfg['timeout'])
tb.set_generic('iterations', cfg['iterations'])
tb.set_generic('log_level', cfg['log_level'] + '_l')
tb.set_generic('error_tol', cfg['error_tolerance'])
# generate & set per-test modelsim tcl file
tcl = build / 'modelsim_init_{}.tcl'.format(name)
with tcl.open('wt', encoding='utf-8') as f:
print(dedent('''\
global TCOMP
set TCOMP tb_{}_unit_test/tb/i_test
'''.format(name)), file=f)
init_files = get_common_modelsim_init_files()
init_files += [str(tcl)]
tb.set_sim_option("modelsim.init_files.after_load", init_files)
if 'wave' in cfg:
path = base / cfg['wave']
if not path.exists():
log.warn('Wave file {} not found'.format(cfg['wave']))
tb.set_sim_option("modelsim.init_file.gui", str(path))
else:
tcl = build / 'modelsim_gui_{}.tcl'.format(name)
with tcl.open('wt', encoding='utf-8') as f:
print(dedent('''\
start_CAN_simulation "dummy"
global IgnoreAddWaveErrors
puts "Automatically adding common waves. Failures are handled gracefully."
set IgnoreAddWaveErrors 1
add_test_status_waves
add_system_waves
set IgnoreAddWaveErrors 0
run_simulation
get_test_results
'''.format(name)), file=f)
tb.set_sim_option("modelsim.init_file.gui", str(tcl))
# --- sanity
def len_to_matrix(topology, bus_len):
l = bus_len
if topology == 'bus':
bm = [[0.0, l[1], l[1]+l[2], l[1]+l[2]+l[3]],
[l[1], 0.0, l[2], l[2]+l[3]],
[l[1]+l[2], l[2], 0.0, l[3]],
[l[1]+l[2]+l[3], l[2]+l[3], l[3], 0.0]]
elif topology == 'star':
bm = [[0.0, l[1]+l[2], l[1]+l[3], l[1]+l[4]],
[l[1]+l[2], 0.0, l[2]+l[3], l[2]+l[4]],
[l[1]+l[3], l[2]+l[3], 0.0, l[3]+l[4]],
[l[1]+l[4], l[2]+l[4], l[3]+l[4], 0.0]]
elif topology == 'tree':
bm = [[0.0, l[1]+l[2], l[1]+l[3]+l[5], l[1]+l[4]+l[5]],
[l[1]+l[2], 0.0, l[2]+l[3]+l[5], l[2]+l[4]+l[5]],
[l[1]+l[3]+l[5], l[2]+l[3]+l[5], 0.0, l[3]+l[4]],
[l[1]+l[4]+l[5], l[2]+l[4]+l[5], l[3]+l[4], 0.0]]
elif topology == 'ring':
raise RuntimeError("Ring topology not implemented.")
# TODO: Ring topology with min functions
elif topology == 'custom':
bm = [[0.0, l[1], l[2], l[3]],
[l[1], 0.0, l[4], l[5]],
[l[2], l[4], 0.0, l[6]],
[l[3], l[6], l[6], 0.0]]
else:
raise ValueError("Invalid bus topology.")
return bm
def dict_merge(up, *lowers):
for lower in lowers:
for k, v in lower.items():
if k not in up:
up[k] = v
def vhdl_serialize(o):
if isinstance(o, Iterable):
ss = []
for x in o:
ss.append(vhdl_serialize(x))
return ''.join(['(', ', '.join(ss), ')'])
else:
return str(o)
def configure_sanity_tests(ui, lib, config):
tb = lib.get_test_benches('*tb_sanity')[0]
default = config['default']
for name, cfg in config['tests'].items():
dict_merge(cfg, default)
bm = len_to_matrix(cfg['topology'], cfg['bus_len_v'])
generics = {
'timeout' : cfg['timeout'],
'iterations' : cfg['iterations'],
'log_level' : cfg['log_level'] + '_l',
'error_tol' : cfg['error_tolerance'],
'topology' : cfg['topology'],
#'bm' : vhdl_serialize(bm),
'bus_len_v' : vhdl_serialize(cfg['bus_len_v']),
'trv_del_v' : vhdl_serialize(cfg['trv_del_v']),
'osc_tol_v' : vhdl_serialize(cfg['osc_tol_v']),
'nw_mean' : vhdl_serialize(cfg['nw_mean']),
'nw_var' : vhdl_serialize(cfg['nw_var']),
'ng_mean' : vhdl_serialize(cfg['ng_mean']),
'ng_var' : vhdl_serialize(cfg['ng_var']),
'timing_config': vhdl_serialize(cfg['timing_config']),
'gauss_iter' : vhdl_serialize(cfg['gauss_iter']),
}
tb.add_config(name, generics=generics)
def configure_feature_tests(ui, lib, config):
pass
def vunit_run(ui, build):
try:
vunit_ifc.run(ui)
res = None
except SystemExit as e:
res = e.code
out = build / '../test_unit.xml1'
if out.exists():
with out.open('rt', encoding='utf-8') as f:
c = f.read()
with open('../test_unit.xml', 'wt', encoding='utf-8') as f:
print('<?xml version="1.0" encoding="utf-8"?>', file=f)
print('<?xml-stylesheet href="xunit.xsl" type="text/xsl"?>', file=f)
f.write(c)
out.unlink()
sys.exit(res)
"""
- vunit configurations
......@@ -397,11 +150,3 @@ def configure_feature_tests(ui, lib, config):
- click._bashcompletion.get_choices -> extend the if to check if the given argument is an instance of XXX
and implement completion method for that instance. Complete test names.
"""
def dump_sim_options(lib):
for tb in lib.get_test_benches('*'):
for cfgs in tb._test_bench.get_configuration_dicts():
for name, cfg in cfgs.items():
print('{}#{}:'.format(tb.name, name))
#pprint(cfg.__dict__)
pprint(cfg.sim_options)
from collections.abc import Iterable
from glob import glob
from os.path import join, dirname, abspath
import logging
from pathlib import Path
__all__ = ['add_sources', 'add_common_sources', 'get_common_modelsim_init_files',
'add_flags', 'dict_merge', 'vhdl_serialize', 'dump_sim_options', 'TestsBase']
d = Path(abspath(__file__)).parent
log = logging.getLogger(__name__)
class TestsBase:
def __init__(self, ui, lib, config, build, base):
self.ui = ui
self.lib = lib
self.config = config
self.build = build
self.base = base
def add_sources(self): raise NotImplementedError()
def configure(self): raise NotImplementedError()
def add_sources(lib, patterns):
for pattern in patterns:
p = join(str(d.parent), pattern)
log.debug('Adding sources matching {}'.format(p))
for f in glob(p, recursive=True):
if f != "tb_wrappers.vhd":
lib.add_source_file(str(f))
def add_common_sources(lib):
return add_sources(lib, ['../src/**/*.vhd', '*.vhd', 'lib/*.vhd'])
def get_common_modelsim_init_files():
modelsim_init_files = '../lib/test_lib.tcl,modelsim_init.tcl'
modelsim_init_files = [str(d/x) for x in modelsim_init_files.split(',')]
return modelsim_init_files
def add_flags(ui, lib, build):
unit_tests = lib.get_test_benches('*_unit_test', allow_empty=True)
for ut in unit_tests:
ut.scan_tests_from_file(str(build / "../unit/vunittb_wrapper.vhd"))
#lib.add_compile_option("ghdl.flags", ["-Wc,-g"])
lib.add_compile_option("ghdl.flags", ["-fprofile-arcs", "-ftest-coverage"])
ui.set_sim_option("ghdl.elab_flags", ["-Wl,-lgcov", "-Wl,--coverage", "-Wl,-no-pie"])
ui.set_sim_option("ghdl.sim_flags", ["--ieee-asserts=disable-at-0"])
modelsim_init_files = get_common_modelsim_init_files()
ui.set_sim_option("modelsim.init_files.after_load", modelsim_init_files)
def dict_merge(up, *lowers):
for lower in lowers:
for k, v in lower.items():
if k not in up:
up[k] = v
def vhdl_serialize(o):
if isinstance(o, Iterable):
ss = []
for x in o:
ss.append(vhdl_serialize(x))
return ''.join(['(', ', '.join(ss), ')'])
else:
return str(o)
def dump_sim_options(lib):
for tb in lib.get_test_benches('*'):
for cfgs in tb._test_bench.get_configuration_dicts():
for name, cfg in cfgs.items():
print('{}#{}:'.format(tb.name, name))
#pprint(cfg.__dict__)
pprint(cfg.sim_options)
import logging
from .test_common import *
log = logging.getLogger(__name__)
class FeatureTests(TestsBase):
def add_sources(self):
add_sources(self.lib, ['feature/**/*.vhd'])
def configure(self):
pass
import logging
from .test_common import *
log = logging.getLogger(__name__)
def len_to_matrix(topology, bus_len):
l = bus_len
if topology == 'bus':
bm = [[0.0, l[1], l[1]+l[2], l[1]+l[2]+l[3]],
[l[1], 0.0, l[2], l[2]+l[3]],
[l[1]+l[2], l[2], 0.0, l[3]],
[l[1]+l[2]+l[3], l[2]+l[3], l[3], 0.0]]
elif topology == 'star':
bm = [[0.0, l[1]+l[2], l[1]+l[3], l[1]+l[4]],
[l[1]+l[2], 0.0, l[2]+l[3], l[2]+l[4]],
[l[1]+l[3], l[2]+l[3], 0.0, l[3]+l[4]],
[l[1]+l[4], l[2]+l[4], l[3]+l[4], 0.0]]
elif topology == 'tree':
bm = [[0.0, l[1]+l[2], l[1]+l[3]+l[5], l[1]+l[4]+l[5]],
[l[1]+l[2], 0.0, l[2]+l[3]+l[5], l[2]+l[4]+l[5]],
[l[1]+l[3]+l[5], l[2]+l[3]+l[5], 0.0, l[3]+l[4]],
[l[1]+l[4]+l[5], l[2]+l[4]+l[5], l[3]+l[4], 0.0]]
elif topology == 'ring':
raise RuntimeError("Ring topology not implemented.")
# TODO: Ring topology with min functions
elif topology == 'custom':
bm = [[0.0, l[1], l[2], l[3]],
[l[1], 0.0, l[4], l[5]],
[l[2], l[4], 0.0, l[6]],
[l[3], l[6], l[6], 0.0]]
else:
raise ValueError("Invalid bus topology.")
return bm
class SanityTests(TestsBase):
def add_sources(self):
add_sources(self.lib, ['sanity/**/*.vhd'])
def configure(self):
# TODO: wave
tb = self.lib.get_test_benches('*tb_sanity')[0]
default = self.config['default']
for name, cfg in self.config['tests'].items():
dict_merge(cfg, default)
bm = len_to_matrix(cfg['topology'], cfg['bus_len_v'])
generics = {
'timeout' : cfg['timeout'],
'iterations' : cfg['iterations'],
'log_level' : cfg['log_level'] + '_l',
'error_tol' : cfg['error_tolerance'],
'topology' : cfg['topology'],
#'bm' : vhdl_serialize(bm),
'bus_len_v' : vhdl_serialize(cfg['bus_len_v']),
'trv_del_v' : vhdl_serialize(cfg['trv_del_v']),
'osc_tol_v' : vhdl_serialize(cfg['osc_tol_v']),
'nw_mean' : vhdl_serialize(cfg['nw_mean']),
'nw_var' : vhdl_serialize(cfg['nw_var']),
'ng_mean' : vhdl_serialize(cfg['ng_mean']),
'ng_var' : vhdl_serialize(cfg['ng_var']),
'timing_config': vhdl_serialize(cfg['timing_config']),
'gauss_iter' : vhdl_serialize(cfg['gauss_iter']),
}
tb.add_config(name, generics=generics)
from textwrap import dedent
import re
import logging
from .test_common import *
log = logging.getLogger(__name__)
class UnitTests(TestsBase):
def add_sources(self):
add_sources(self.lib, ['unit/**/*.vhd'])
create_wrapper(self.lib, self.build / "tb_wrappers.vhd")
def configure(self):
ui, lib, config, build = self.ui, self.lib, self.config, self.build
default = config['default']
unit_tests = lib.get_test_benches('*_unit_test')
for name, cfg in config['tests'].items():
dict_merge(cfg, default)
tb = lib.get_test_benches('*tb_{}_unit_test'.format(name), allow_empty=True)
if not len(tb):
pprint([x.name for x in unit_tests])
raise RuntimeError('Testbench {}_unit_test does not exist (but specified in config).'.format(name))
assert len(tb) == 1
tb = tb[0]
tb.set_generic('timeout', cfg['timeout'])
tb.set_generic('iterations', cfg['iterations'])
tb.set_generic('log_level', cfg['log_level'] + '_l')
tb.set_generic('error_tol', cfg['error_tolerance'])
# generate & set per-test modelsim tcl file
tcl = build / 'modelsim_init_{}.tcl'.format(name)
with tcl.open('wt', encoding='utf-8') as f:
print(dedent('''\
global TCOMP
set TCOMP tb_{}_unit_test/tb/i_test
'''.format(name)), file=f)
init_files = get_common_modelsim_init_files()
init_files += [str(tcl)]
tb.set_sim_option("modelsim.init_files.after_load", init_files)
if 'wave' in cfg:
path = self.base / cfg['wave']
if not path.exists():
log.warn('Wave file {} not found'.format(cfg['wave']))
tb.set_sim_option("modelsim.init_file.gui", str(path))
else:
tcl = build / 'modelsim_gui_{}.tcl'.format(name)
with tcl.open('wt', encoding='utf-8') as f:
print(dedent('''\
start_CAN_simulation "dummy"
global IgnoreAddWaveErrors
puts "Automatically adding common waves. Failures are handled gracefully."
set IgnoreAddWaveErrors 1
add_test_status_waves
add_system_waves
set IgnoreAddWaveErrors 0
run_simulation
get_test_results
'''.format(name)), file=f)
tb.set_sim_option("modelsim.init_file.gui", str(tcl))
self._check_for_unconfigured()
def _check_for_unconfigured(self):
lib, config = self.lib, self.config
# check for unconfigured unit tests
unit_tests = lib.get_test_benches('*tb_*_unit_test')
configured = ['tb_{}_unit_test'.format(name) for name in config['tests'].keys()]
log.debug('Configured unit tests: {}'.format(', '.join(configured)))
unconfigured = [tb for tb in unit_tests if tb.name not in configured]
if len(unconfigured):
log.warn("Unit tests with no configuration found (defaults will be used): {}".format(', '.join(tb.name for tb in unconfigured)))