mtest: annotate and fix bugs found by annotating

This started out with a bug report of mtest trying to add bytes + str,
which I though "Oh, mypy can help!" and turned into an entire day of
awful code traversal and trying to figure out why attributes were
changing type. Hopefully this makes everything cleaner and easier to
follow.
pull/5395/head
Dylan Baker 5 years ago
parent 2cc70c631b
commit c571b0b185
  1. 201
      mesonbuild/mtest.py

@ -34,6 +34,7 @@ import subprocess
import sys
import tempfile
import time
import typing
from . import build
from . import environment
@ -41,6 +42,9 @@ from . import mlog
from .dependencies import ExternalProgram
from .mesonlib import substring_is_in_list, MesonException
if typing.TYPE_CHECKING:
from .backend.backends import TestSerialisation
# GNU autotools interprets a return code of 77 from tests it executes to
# mean that the test should be skipped.
GNU_SKIP_RETURNCODE = 77
@ -49,15 +53,15 @@ GNU_SKIP_RETURNCODE = 77
# mean that the test failed even before testing what it is supposed to test.
GNU_ERROR_RETURNCODE = 99
def is_windows():
def is_windows() -> bool:
platname = platform.system().lower()
return platname == 'windows' or 'mingw' in platname
def is_cygwin():
def is_cygwin() -> bool:
platname = platform.system().lower()
return 'cygwin' in platname
def determine_worker_count():
def determine_worker_count() -> int:
varname = 'MESON_TESTTHREADS'
if varname in os.environ:
try:
@ -74,7 +78,7 @@ def determine_worker_count():
num_workers = 1
return num_workers
def add_arguments(parser):
def add_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument('--repeat', default=1, dest='repeat', type=int,
help='Number of times to run the tests.')
parser.add_argument('--no-rebuild', default=False, action='store_true',
@ -117,7 +121,7 @@ def add_arguments(parser):
help='Optional list of tests to run')
def returncode_to_status(retcode):
def returncode_to_status(retcode: int) -> str:
# Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related
# functions here because the status returned by subprocess is munged. It
# returns a negative value if the process was killed by a signal rather than
@ -142,7 +146,7 @@ def returncode_to_status(retcode):
signame = 'SIGinvalid'
return '(exit status %d or signal %d %s)' % (retcode, signum, signame)
def env_tuple_to_str(env):
def env_tuple_to_str(env: typing.Iterable[typing.Tuple[str, str]]) -> str:
return ''.join(["%s='%s' " % (k, v) for k, v in env])
@ -162,7 +166,7 @@ class TestResult(enum.Enum):
ERROR = 'ERROR'
class TAPParser(object):
class TAPParser:
Plan = namedtuple('Plan', ['count', 'late', 'skipped', 'explanation'])
Bailout = namedtuple('Bailout', ['message'])
Test = namedtuple('Test', ['number', 'name', 'result', 'explanation'])
@ -181,10 +185,11 @@ class TAPParser(object):
_RE_YAML_START = re.compile(r'(\s+)---.*')
_RE_YAML_END = re.compile(r'\s+\.\.\.\s*')
def __init__(self, io):
def __init__(self, io: typing.Iterator[str]):
self.io = io
def parse_test(self, ok, num, name, directive, explanation):
def parse_test(self, ok: bool, num: int, name: str, directive: typing.Optional[str], explanation: typing.Optional[str]) -> \
typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error'], None, None]:
name = name.strip()
explanation = explanation.strip() if explanation else None
if directive is not None:
@ -201,14 +206,14 @@ class TAPParser(object):
yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation)
def parse(self):
def parse(self) -> typing.Generator[typing.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'], None, None]:
found_late_test = False
bailed_out = False
plan = None
lineno = 0
num_tests = 0
yaml_lineno = None
yaml_indent = None
yaml_indent = ''
state = self._MAIN
version = 12
while True:
@ -235,7 +240,7 @@ class TAPParser(object):
continue
if line.startswith(yaml_indent):
continue
yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,))
yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno))
state = self._MAIN
assert state == self._MAIN
@ -297,7 +302,7 @@ class TAPParser(object):
yield self.Error('unexpected input at line %d' % (lineno,))
if state == self._YAML:
yield self.Error('YAML block not terminated (started on line %d)' % (yaml_lineno,))
yield self.Error('YAML block not terminated (started on line {})'.format(yaml_lineno))
if not bailed_out and plan and num_tests != plan.count:
if num_tests < plan.count:
@ -307,8 +312,12 @@ class TAPParser(object):
class TestRun:
@staticmethod
def make_exitcode(test, returncode, duration, stdo, stde, cmd):
@classmethod
def make_exitcode(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str],
returncode: int, duration: float, stdo: typing.Optional[str],
stde: typing.Optional[str],
cmd: typing.Optional[typing.List[str]]) -> 'TestRun':
if returncode == GNU_SKIP_RETURNCODE:
res = TestResult.SKIP
elif returncode == GNU_ERROR_RETURNCODE:
@ -317,9 +326,12 @@ class TestRun:
res = TestResult.EXPECTEDFAIL if bool(returncode) else TestResult.UNEXPECTEDPASS
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
return TestRun(test, res, returncode, duration, stdo, stde, cmd)
return cls(test, test_env, res, returncode, duration, stdo, stde, cmd)
def make_tap(test, returncode, duration, stdo, stde, cmd):
@classmethod
def make_tap(cls, test: 'TestSerialisation', test_env: typing.Dict[str, str],
returncode: int, duration: float, stdo: str, stde: str,
cmd: typing.Optional[typing.List[str]]) -> 'TestRun':
res = None
num_tests = 0
failed = False
@ -352,9 +364,12 @@ class TestRun:
else:
res = TestResult.FAIL if failed else TestResult.OK
return TestRun(test, res, returncode, duration, stdo, stde, cmd)
return cls(test, test_env, res, returncode, duration, stdo, stde, cmd)
def __init__(self, test, res, returncode, duration, stdo, stde, cmd):
def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str],
res: TestResult, returncode: int, duration: float,
stdo: typing.Optional[str], stde: typing.Optional[str],
cmd: typing.Optional[typing.List[str]]):
assert isinstance(res, TestResult)
self.res = res
self.returncode = returncode
@ -362,10 +377,10 @@ class TestRun:
self.stdo = stdo
self.stde = stde
self.cmd = cmd
self.env = test.env
self.env = test_env
self.should_fail = test.should_fail
def get_log(self):
def get_log(self) -> str:
res = '--- command ---\n'
if self.cmd is None:
res += 'NONE\n'
@ -385,7 +400,7 @@ class TestRun:
res += '-------\n\n'
return res
def decode(stream):
def decode(stream: typing.Union[None, bytes]) -> str:
if stream is None:
return ''
try:
@ -393,51 +408,50 @@ def decode(stream):
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
def write_json_log(jsonlogfile, test_name, result):
def write_json_log(jsonlogfile: typing.TextIO, test_name: str, result: TestRun) -> None:
jresult = {'name': test_name,
'stdout': result.stdo,
'result': result.res.value,
'duration': result.duration,
'returncode': result.returncode,
'command': result.cmd}
if isinstance(result.env, dict):
jresult['env'] = result.env
else:
jresult['env'] = result.env.get_env(os.environ)
'env': result.env,
'command': result.cmd} # type: typing.Dict[str, typing.Any]
if result.stde:
jresult['stderr'] = result.stde
jsonlogfile.write(json.dumps(jresult) + '\n')
def run_with_mono(fname):
def run_with_mono(fname: str) -> bool:
if fname.endswith('.exe') and not (is_windows() or is_cygwin()):
return True
return False
def load_benchmarks(build_dir):
def load_benchmarks(build_dir: str) -> typing.List['TestSerialisation']:
datafile = os.path.join(build_dir, 'meson-private', 'meson_benchmark_setup.dat')
if not os.path.isfile(datafile):
raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir))
with open(datafile, 'rb') as f:
obj = pickle.load(f)
obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f))
return obj
def load_tests(build_dir):
def load_tests(build_dir: str) -> typing.List['TestSerialisation']:
datafile = os.path.join(build_dir, 'meson-private', 'meson_test_setup.dat')
if not os.path.isfile(datafile):
raise TestException('Directory ${!r} does not seem to be a Meson build directory.'.format(build_dir))
with open(datafile, 'rb') as f:
obj = pickle.load(f)
obj = typing.cast(typing.List['TestSerialisation'], pickle.load(f))
return obj
class SingleTestRunner:
def __init__(self, test, env, options):
def __init__(self, test: 'TestSerialisation', test_env: typing.Dict[str, str],
env: typing.Dict[str, str], options: argparse.Namespace):
self.test = test
self.test_env = test_env
self.env = env
self.options = options
def _get_cmd(self):
def _get_cmd(self) -> typing.Optional[typing.List[str]]:
if self.test.fname[0].endswith('.jar'):
return ['java', '-jar'] + self.test.fname
elif not self.test.is_cross_built and run_with_mono(self.test.fname[0]):
@ -457,19 +471,18 @@ class SingleTestRunner:
else:
return self.test.fname
def run(self):
def run(self) -> TestRun:
cmd = self._get_cmd()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
return TestRun(test=self.test, res=TestResult.SKIP, returncode=GNU_SKIP_RETURNCODE,
duration=0.0, stdo=skip_stdout, stde=None, cmd=None)
return TestRun(self.test, self.test_env, TestResult.SKIP, GNU_SKIP_RETURNCODE, 0.0, skip_stdout, None, None)
else:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
self.test.timeout = None
return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
def _run_cmd(self, cmd):
def _run_cmd(self, cmd: typing.List[str]) -> TestRun:
starttime = time.time()
if len(self.test.extra_paths) > 0:
@ -506,7 +519,7 @@ class SingleTestRunner:
# Make the meson executable ignore SIGINT while gdb is running.
signal.signal(signal.SIGINT, signal.SIG_IGN)
def preexec_fn():
def preexec_fn() -> None:
if self.options.gdb:
# Restore the SIGINT handler for the child process to
# ensure it can handle it.
@ -535,7 +548,7 @@ class SingleTestRunner:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
if self.options.verbose:
print('%s time out (After %d seconds)' % (self.test.name, timeout))
print('{} time out (After {} seconds)'.format(self.test.name, timeout))
timed_out = True
except KeyboardInterrupt:
mlog.warning('CTRL-C detected while running %s' % (self.test.name))
@ -572,9 +585,9 @@ class SingleTestRunner:
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
additional_error = b'Test process could not be killed.'
additional_error = 'Test process could not be killed.'
except ValueError:
additional_error = b'Could not read output. Maybe the process has redirected its stdout/stderr?'
additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?'
endtime = time.time()
duration = endtime - starttime
if additional_error is None:
@ -592,20 +605,20 @@ class SingleTestRunner:
stdo = ""
stde = additional_error
if timed_out:
return TestRun(self.test, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd)
return TestRun(self.test, self.test_env, TestResult.TIMEOUT, p.returncode, duration, stdo, stde, cmd)
else:
if self.test.protocol == 'exitcode':
return TestRun.make_exitcode(self.test, p.returncode, duration, stdo, stde, cmd)
return TestRun.make_exitcode(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd)
else:
if self.options.verbose:
print(stdo, end='')
return TestRun.make_tap(self.test, p.returncode, duration, stdo, stde, cmd)
return TestRun.make_tap(self.test, self.test_env, p.returncode, duration, stdo, stde, cmd)
class TestHarness:
def __init__(self, options):
def __init__(self, options: argparse.Namespace):
self.options = options
self.collected_logs = []
self.collected_logs = [] # type: typing.List[str]
self.fail_count = 0
self.expectedfail_count = 0
self.unexpectedpass_count = 0
@ -614,23 +627,26 @@ class TestHarness:
self.timeout_count = 0
self.is_run = False
self.tests = None
self.suites = None
self.logfilename = None
self.logfile = None
self.jsonlogfile = None
self.logfilename = None # type: typing.Optional[str]
self.logfile = None # type: typing.Optional[typing.TextIO]
self.jsonlogfile = None # type: typing.Optional[typing.TextIO]
if self.options.benchmark:
self.tests = load_benchmarks(options.wd)
else:
self.tests = load_tests(options.wd)
self.load_suites()
ss = set()
for t in self.tests:
for s in t.suite:
ss.add(s)
self.suites = list(ss)
def __del__(self):
def __del__(self) -> None:
if self.logfile:
self.logfile.close()
if self.jsonlogfile:
self.jsonlogfile.close()
def merge_suite_options(self, options, test):
def merge_suite_options(self, options: argparse.Namespace, test: 'TestSerialisation') -> typing.Dict[str, str]:
if ':' in options.setup:
if options.setup not in self.build_data.test_setups:
sys.exit("Unknown test setup '%s'." % options.setup)
@ -654,7 +670,7 @@ class TestHarness:
options.wrapper = current.exe_wrapper
return current.env.get_env(os.environ.copy())
def get_test_runner(self, test):
def get_test_runner(self, test: 'TestSerialisation') -> SingleTestRunner:
options = deepcopy(self.options)
if not options.setup:
options.setup = self.build_data.test_setup_default_name
@ -662,12 +678,11 @@ class TestHarness:
env = self.merge_suite_options(options, test)
else:
env = os.environ.copy()
if isinstance(test.env, build.EnvironmentVariables):
test.env = test.env.get_env(env)
env.update(test.env)
return SingleTestRunner(test, env, options)
test_env = test.env.get_env(env)
env.update(test_env)
return SingleTestRunner(test, test_env, env, options)
def process_test_result(self, result):
def process_test_result(self, result: TestRun) -> None:
if result.res is TestResult.TIMEOUT:
self.timeout_count += 1
elif result.res is TestResult.SKIP:
@ -683,7 +698,8 @@ class TestHarness:
else:
sys.exit('Unknown test result encountered: {}'.format(result.res))
def print_stats(self, numlen, tests, name, result, i):
def print_stats(self, numlen: int, tests: typing.List['TestSerialisation'],
name: str, result: TestRun, i: int) -> None:
startpad = ' ' * (numlen - len('%d' % (i + 1)))
num = '%s%d/%d' % (startpad, i + 1, len(tests))
padding1 = ' ' * (38 - len(name))
@ -718,7 +734,7 @@ class TestHarness:
if self.jsonlogfile:
write_json_log(self.jsonlogfile, name, result)
def print_summary(self):
def print_summary(self) -> None:
msg = '''
Ok: %4d
Expected Fail: %4d
@ -732,7 +748,7 @@ Timeout: %4d
if self.logfile:
self.logfile.write(msg)
def print_collected_logs(self):
def print_collected_logs(self) -> None:
if len(self.collected_logs) > 0:
if len(self.collected_logs) > 10:
print('\nThe output from 10 first failed tests:\n')
@ -751,10 +767,10 @@ Timeout: %4d
line = line.encode('ascii', errors='replace').decode()
print(line)
def total_failure_count(self):
def total_failure_count(self) -> int:
return self.fail_count + self.unexpectedpass_count + self.timeout_count
def doit(self):
def doit(self) -> int:
if self.is_run:
raise RuntimeError('Test harness object can only be used once.')
self.is_run = True
@ -765,14 +781,16 @@ Timeout: %4d
return self.total_failure_count()
@staticmethod
def split_suite_string(suite):
def split_suite_string(suite: str) -> typing.Tuple[str, str]:
if ':' in suite:
return suite.split(':', 1)
# mypy can't figure out that str.split(n, 1) will return a list of
# length 2, so we have to help it.
return typing.cast(typing.Tuple[str, str], tuple(suite.split(':', 1)))
else:
return suite, ""
@staticmethod
def test_in_suites(test, suites):
def test_in_suites(test: 'TestSerialisation', suites: typing.List[str]) -> bool:
for suite in suites:
(prj_match, st_match) = TestHarness.split_suite_string(suite)
for prjst in test.suite:
@ -803,18 +821,11 @@ Timeout: %4d
return True
return False
def test_suitable(self, test):
def test_suitable(self, test: 'TestSerialisation') -> bool:
return (not self.options.include_suites or TestHarness.test_in_suites(test, self.options.include_suites)) \
and not TestHarness.test_in_suites(test, self.options.exclude_suites)
def load_suites(self):
ss = set()
for t in self.tests:
for s in t.suite:
ss.add(s)
self.suites = list(ss)
def get_tests(self):
def get_tests(self) -> typing.List['TestSerialisation']:
if not self.tests:
print('No tests defined.')
return []
@ -834,14 +845,11 @@ Timeout: %4d
print('No suitable tests defined.')
return []
for test in tests:
test.rebuilt = False
return tests
def open_log_files(self):
def open_log_files(self) -> None:
if not self.options.logbase or self.options.verbose:
return None, None, None, None
return
namebase = None
logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase)
@ -865,8 +873,8 @@ Timeout: %4d
self.logfile.write('Inherited environment: {}\n\n'.format(inherit_env))
@staticmethod
def get_wrapper(options):
wrap = []
def get_wrapper(options: argparse.Namespace) -> typing.List[str]:
wrap = [] # type: typing.List[str]
if options.gdb:
wrap = ['gdb', '--quiet', '--nh']
if options.repeat > 1:
@ -875,10 +883,9 @@ Timeout: %4d
wrap += ['--args']
if options.wrapper:
wrap += options.wrapper
assert(isinstance(wrap, list))
return wrap
def get_pretty_suite(self, test):
def get_pretty_suite(self, test: 'TestSerialisation') -> str:
if len(self.suites) > 1 and test.suite:
rv = TestHarness.split_suite_string(test.suite[0])[0]
s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite)
@ -888,9 +895,9 @@ Timeout: %4d
else:
return test.name
def run_tests(self, tests):
def run_tests(self, tests: typing.List['TestSerialisation']) -> None:
executor = None
futures = []
futures = [] # type: typing.List[typing.Tuple[conc.Future[TestRun], int, typing.List[TestSerialisation], str, int]]
numlen = len('%d' % len(tests))
self.open_log_files()
startdir = os.getcwd()
@ -929,9 +936,9 @@ Timeout: %4d
finally:
os.chdir(startdir)
def drain_futures(self, futures):
for i in futures:
(result, numlen, tests, name, i) = i
def drain_futures(self, futures: typing.List[typing.Tuple['conc.Future[TestRun]', int, typing.List['TestSerialisation'], str, int]]) -> None:
for x in futures:
(result, numlen, tests, name, i) = x
if self.options.repeat > 1 and self.fail_count:
result.cancel()
if self.options.verbose:
@ -939,7 +946,7 @@ Timeout: %4d
self.process_test_result(result.result())
self.print_stats(numlen, tests, name, result.result(), i)
def run_special(self):
def run_special(self) -> int:
'''Tests run by the user, usually something like "under gdb 1000 times".'''
if self.is_run:
raise RuntimeError('Can not use run_special after a full run.')
@ -950,13 +957,13 @@ Timeout: %4d
return self.total_failure_count()
def list_tests(th):
def list_tests(th: TestHarness) -> bool:
tests = th.get_tests()
for t in tests:
print(th.get_pretty_suite(t))
return not tests
def rebuild_all(wd):
def rebuild_all(wd: str) -> bool:
if not os.path.isfile(os.path.join(wd, 'build.ninja')):
print('Only ninja backend is supported to rebuild tests before running them.')
return True
@ -975,7 +982,7 @@ def rebuild_all(wd):
return True
def run(options):
def run(options: argparse.Namespace) -> int:
if options.benchmark:
options.num_processes = 1
@ -1020,7 +1027,7 @@ def run(options):
print(e)
return 1
def run_with_args(args):
def run_with_args(args: typing.List[str]) -> int:
parser = argparse.ArgumentParser(prog='meson test')
add_arguments(parser)
options = parser.parse_args(args)

Loading…
Cancel
Save