import atexit import functools import logging import os import secrets import shutil import string import subprocess import sys import unittest from functools import lru_cache import pqclean @atexit.register def cleanup_testcases(): """Clean up any remaining isolated test dirs""" print("Cleaning up testcases directory", file=sys.stderr) for dir_ in TEST_TEMPDIRS: shutil.rmtree(dir_, ignore_errors=True) TEST_TEMPDIRS = [] ALPHABET = string.ascii_letters + string.digits + '_' def mktmpdir(parent, prefix): """Returns a unique directory name""" uniq = ''.join(secrets.choice(ALPHABET) for i in range(8)) return os.path.join(parent, "{}_{}".format(prefix, uniq)) def isolate_test_files(impl_path, test_prefix, dir=os.path.join('..', 'testcases')): """Isolates the test files in a separate directory, to help parallelise. Especially Windows is problematic and needs isolation of all test files: its build process will create .obj files EVERYWHERE. """ try: os.mkdir(dir) except FileExistsError: pass test_dir = mktmpdir(dir, test_prefix) test_dir = os.path.abspath(test_dir) TEST_TEMPDIRS.append(test_dir) # the implementation will go here. scheme_dir = os.path.join(test_dir, 'crypto_bla', 'scheme') new_impl_dir = os.path.abspath(os.path.join(scheme_dir, 'impl')) def initializer(): """Isolate the files to be tested""" # Create layers in folder structure os.makedirs(scheme_dir) # Create test dependencies structure os.mkdir(os.path.join(test_dir, 'test')) # Copy common files (randombytes.c, aes.c, ...) shutil.copytree( os.path.join('..', 'common'), os.path.join(test_dir, 'common')) # Copy makefiles shutil.copy(os.path.join('..', 'test', 'Makefile'), os.path.join(test_dir, 'test', 'Makefile')) shutil.copy(os.path.join('..', 'test', 'Makefile.Microsoft_nmake'), os.path.join(test_dir, 'test', 'Makefile.Microsoft_nmake')) # Copy directories with support files for d in ['common', 'test_common', 'crypto_sign', 'crypto_kem']: shutil.copytree( os.path.join('..', 'test', d), os.path.join(test_dir, 'test', d) ) shutil.copytree(impl_path, new_impl_dir) def destructor(): """Clean up the isolated files""" shutil.rmtree(test_dir, ignore_errors=True) return (test_dir, new_impl_dir, initializer, destructor) def run_subprocess(command, working_dir='.', env=None, expected_returncode=0, print_output=True): """ Helper function to run a shell command and report success/failure depending on the exit status of the shell command. """ if env is not None: env_ = os.environ.copy() env_.update(env) env = env_ # Note we need to capture stdout/stderr from the subprocess, # then print it, which the unittest will then capture and # buffer appropriately print(working_dir + " > " + " ".join(command)) result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=working_dir, env=env, ) if print_output: print(result.stdout.decode('utf-8')) if expected_returncode is not None: assert result.returncode == expected_returncode, \ "Got unexpected return code {}".format(result.returncode) else: return (result.returncode, result.stdout.decode('utf-8')) return result.stdout.decode('utf-8') def make(*args, working_dir='.', env=None, expected_returncode=0, **kwargs): """ Runs a make target in the specified working directory Usage: make('clean', 'targetb', SCHEME='bla') """ if os.name == 'nt': make_command = ['nmake', '/f', 'Makefile.Microsoft_nmake', '/NOLOGO', '/E'] # we need SCHEME_UPPERCASE and IMPLEMENTATION_UPPERCASE with nmake for envvar in ['IMPLEMENTATION', 'SCHEME']: if envvar in kwargs: kwargs['{}_UPPERCASE'.format(envvar)] = ( kwargs[envvar].upper().replace('-', '')) else: make_command = ['make'] return run_subprocess( [ *make_command, *['{}={}'.format(k, v) for k, v in kwargs.items()], *args, ], working_dir=working_dir, env=env, expected_returncode=expected_returncode, ) def skip_windows(message="This test is not supported on Windows"): def wrapper(f): @functools.wraps(f) def skip_windows(*args, **kwargs): raise unittest.SkipTest(message) if os.name == 'nt': return skip_windows else: return f return wrapper @lru_cache(maxsize=None) def ensure_available(executable): """ Checks if a command is available. If a command MUST be available, because we are in a CI environment, raises an AssertionError. In the docker containers, on Travis and on Windows, CI=true is set. """ path = shutil.which(executable) if path: return path # Installing clang-tidy on LLVM will be too much of a mess. if ((executable == 'clang-tidy' and sys.platform == 'darwin') or 'CI' not in os.environ): raise unittest.SkipTest( "{} is not available on PATH. Install it to run this test.{}" .format(executable, "" if not os.name == 'nt' else "On Windows, make sure to add it to PATH") ) raise AssertionError("{} not available on CI".format(executable)) def permit_test(testname, *args, **kwargs): if len(args) == 0: thing = list(kwargs.values())[0] else: thing = args[0] if 'PQCLEAN_ONLY_TESTS' in os.environ: if not(testname.lower() in os.environ['PQCLEAN_ONLY_TESTS'].lower().split(',')): return False if 'PQCLEAN_SKIP_TESTS' in os.environ: if testname.lower() in os.environ['PQCLEAN_SKIP_TESTS'].lower().split(','): return False if isinstance(thing, pqclean.Implementation): scheme = thing.scheme elif isinstance(thing, pqclean.Scheme): scheme = thing else: return True if 'PQCLEAN_ONLY_TYPES' in os.environ: if not(scheme.type.lower() in os.environ['PQCLEAN_ONLY_TYPES'].lower().split(',')): return False if 'PQCLEAN_SKIP_TYPES' in os.environ: if scheme.type.lower() in os.environ['PQCLEAN_SKIP_TYPES'].lower().split(','): return False if 'PQCLEAN_ONLY_SCHEMES' in os.environ: if not(scheme.name.lower() in os.environ['PQCLEAN_ONLY_SCHEMES'].lower().split(',')): return False if 'PQCLEAN_SKIP_SCHEMES' in os.environ: if scheme.name.lower() in os.environ['PQCLEAN_SKIP_SCHEMES'].lower().split(','): return False if 'PQCLEAN_ONLY_DIFF' in os.environ: if shutil.which('git') is not None: # if we're on a non-master branch, and the only changes are in schemes, # only run tests on those schemes branch_result = subprocess.run( ['git', 'status', '--porcelain=2', '--branch'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd="..", ) # ensure we're in a working directory if branch_result.returncode != 0: return True # ensure we're not on master branch for branch_line in branch_result.stdout.decode('utf-8').splitlines(): tokens = branch_line.split(' ') if tokens[0] == '#' and tokens[1] == 'branch.head': if tokens[2] == 'master': return True # where are there changes? diff_result = subprocess.run( ['git', 'diff', '--name-only', 'origin/master'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) assert diff_result.returncode == 0, \ "Got unexpected return code {}".format(diff_result.returncode) for diff_line in diff_result.stdout.decode('utf-8').splitlines(): # Git still returns UNIX-style paths on Windows, normalize diff_line = os.path.normpath(diff_line) # don't skip test if there are any changes outside schemes if (not diff_line.startswith('crypto_kem') and not diff_line.startswith('crypto_sign') and not diff_line.startswith(os.path.join('test', 'duplicate_consistency'))): logging.info("Running all tests as there are changes " "outside of schemes") return True # do test if the scheme in question has been changed if diff_line.startswith(thing.path(base='')): return True # do test if the scheme's duplicate_consistency files have been changed if diff_line.startswith(os.path.join('test', 'duplicate_consistency', scheme.name.lower())): return True # there were no changes outside schemes, and the scheme in question had no diffs return False return True def filtered_test(func): funcname = func.__name__[len("test_"):] @functools.wraps(func) def wrapper(*args, **kwargs): if permit_test(funcname, *args, **kwargs): return func(*args, **kwargs) else: raise unittest.SkipTest("Test disabled by filter") return wrapper @lru_cache(maxsize=1) def get_cpu_info(): the_info = None while the_info is None or 'flags' not in the_info: import cpuinfo the_info = cpuinfo.get_cpu_info() # CPUINFO is unreliable on Travis CI Macs if 'CI' in os.environ and sys.platform == 'darwin': the_info['flags'] = [ 'aes', 'apic', 'avx1.0', 'clfsh', 'cmov', 'cx16', 'cx8', 'de', 'em64t', 'erms', 'f16c', 'fpu', 'fxsr', 'lahf', 'mca', 'mce', 'mmx', 'mon', 'msr', 'mtrr', 'osxsave', 'pae', 'pat', 'pcid', 'pclmulqdq', 'pge', 'popcnt', 'pse', 'pse36', 'rdrand', 'rdtscp', 'rdwrfsgs', 'sep', 'smep', 'ss', 'sse', 'sse2', 'sse3', 'sse4.1', 'sse4.2', 'ssse3', 'syscall', 'tsc', 'tsc_thread_offset', 'tsci', 'tsctmr', 'vme', 'vmm', 'x2apic', 'xd', 'xsave'] return the_info