  1. import atexit
  2. import functools
  3. import logging
  4. import os
  5. import shutil
  6. import subprocess
  7. import sys
  8. import tempfile
  9. import unittest
  10. import pqclean
  11. @atexit.register
  12. def cleanup_testcases():
  13. """Clean up any remaining isolated test dirs"""
  14. print("Cleaning up testcases directory",
  15. file=sys.stderr)
  16. for dir_ in TEST_TEMPDIRS:
  17. shutil.rmtree(dir_, ignore_errors=True)
  18. TEST_TEMPDIRS = []
  19. def isolate_test_files(impl_path, test_prefix,
  20. dir=os.path.join('..', 'testcases')):
  21. """Isolates the test files in a separate directory, to help parallelise.
  22. Especially Windows is problematic and needs isolation of all test files:
  23. its build process will create .obj files EVERYWHERE.
  24. """
  25. try:
  26. os.mkdir(dir)
  27. except FileExistsError:
  28. pass
  29. test_dir = tempfile.mkdtemp(prefix=test_prefix, dir=dir)
  30. test_dir = os.path.abspath(test_dir)
  31. TEST_TEMPDIRS.append(test_dir)
  32. # Create layers in folder structure
  33. nested_dir = os.path.join(test_dir, 'crypto_bla')
  34. os.mkdir(nested_dir)
  35. nested_dir = os.path.join(nested_dir, 'scheme')
  36. os.mkdir(nested_dir)
  37. # Create test dependencies structure
  38. os.mkdir(os.path.join(test_dir, 'test'))
  39. # the implementation will go here.
  40. new_impl_dir = os.path.abspath(os.path.join(nested_dir, 'impl'))
  41. def initializer():
  42. """Isolate the files to be tested"""
  43. # Copy common files (randombytes.c, aes.c, ...)
  44. shutil.copytree(
  45. os.path.join('..', 'common'), os.path.join(test_dir, 'common'))
  46. # Copy makefiles
  47. shutil.copy(os.path.join('..', 'test', 'Makefile'),
  48. os.path.join(test_dir, 'test', 'Makefile'))
  49. shutil.copy(os.path.join('..', 'test', 'Makefile.Microsoft_nmake'),
  50. os.path.join(test_dir, 'test', 'Makefile.Microsoft_nmake'))
  51. # Copy directories with support files
  52. for d in ['common', 'test_common', 'crypto_sign', 'crypto_kem']:
  53. shutil.copytree(
  54. os.path.join('..', 'test', d),
  55. os.path.join(test_dir, 'test', d)
  56. )
  57. shutil.copytree(impl_path, new_impl_dir)
  58. def destructor():
  59. """Clean up the isolated files"""
  60. shutil.rmtree(test_dir, ignore_errors=True)
  61. return (test_dir, new_impl_dir, initializer, destructor)
  62. def run_subprocess(command, working_dir='.', env=None, expected_returncode=0,
  63. print_output=True):
  64. """
  65. Helper function to run a shell command and report success/failure
  66. depending on the exit status of the shell command.
  67. """
  68. if env is not None:
  69. env_ = os.environ.copy()
  70. env_.update(env)
  71. env = env_
  72. # Note we need to capture stdout/stderr from the subprocess,
  73. # then print it, which the unittest will then capture and
  74. # buffer appropriately
  75. print(working_dir + " > " + " ".join(command))
  76. result = subprocess.run(
  77. command,
  78. stdout=subprocess.PIPE,
  79. stderr=subprocess.STDOUT,
  80. cwd=working_dir,
  81. env=env,
  82. )
  83. if print_output:
  84. print(result.stdout.decode('utf-8'))
  85. if expected_returncode is not None:
  86. assert result.returncode == expected_returncode, \
  87. "Got unexpected return code {}".format(result.returncode)
  88. else:
  89. return (result.returncode, result.stdout.decode('utf-8'))
  90. return result.stdout.decode('utf-8')
  91. def make(*args, working_dir='.', env=None, expected_returncode=0, **kwargs):
  92. """
  93. Runs a make target in the specified working directory
  94. Usage:
  95. make('clean', 'targetb', SCHEME='bla')
  96. """
  97. if os.name == 'nt':
  98. make_command = ['nmake', '/f', 'Makefile.Microsoft_nmake',
  99. '/NOLOGO', '/E']
  101. for envvar in ['IMPLEMENTATION', 'SCHEME']:
  102. if envvar in kwargs:
  103. kwargs['{}_UPPERCASE'.format(envvar)] = (
  104. kwargs[envvar].upper().replace('-', ''))
  105. else:
  106. make_command = ['make']
  107. return run_subprocess(
  108. [
  109. *make_command,
  110. *['{}={}'.format(k, v) for k, v in kwargs.items()],
  111. *args,
  112. ],
  113. working_dir=working_dir,
  114. env=env,
  115. expected_returncode=expected_returncode,
  116. )
  117. def skip_windows(message="This test is not supported on Windows"):
  118. def wrapper(f):
  119. @functools.wraps(f)
  120. def skip_windows(*args, **kwargs):
  121. raise unittest.SkipTest(message)
  122. if os.name == 'nt':
  123. return skip_windows
  124. else:
  125. return f
  126. return wrapper
  127. def slow_test(f):
  128. @functools.wraps(f)
  129. def wrapper(*args, **kwargs):
  130. if ('CI' in os.environ and 'RUN_SLOW' not in os.environ and
  131. os.environ.get('TRAVIS_EVENT_TYPE') != 'cron'):
  132. raise unittest.SkipTest("Slow test skipped on CI run")
  133. return f(*args, **kwargs)
  134. return wrapper
  135. def ensure_available(executable):
  136. """
  137. Checks if a command is available.
  138. If a command MUST be available, because we are in a CI environment,
  139. raises an AssertionError.
  140. In the docker containers, on Travis and on Windows, CI=true is set.
  141. """
  142. path = shutil.which(executable)
  143. if path:
  144. return path
  145. # Installing clang-tidy on LLVM will be too much of a mess.
  146. if ((executable == 'clang-tidy' and sys.platform == 'darwin')
  147. or 'CI' not in os.environ):
  148. raise unittest.SkipTest(
  149. "{} is not available on PATH. Install it to run this test.{}"
  150. .format(executable, "" if not os.name == 'nt'
  151. else "On Windows, make sure to add it to PATH")
  152. )
  153. raise AssertionError("{} not available on CI".format(executable))
  154. def permit_test(testname, *args, **kwargs):
  155. if len(args) == 0:
  156. thing = list(kwargs.values())[0]
  157. else:
  158. thing = args[0]
  159. if 'PQCLEAN_ONLY_TESTS' in os.environ:
  160. if not(testname.lower() in os.environ['PQCLEAN_ONLY_TESTS'].lower().split(',')):
  161. return False
  162. if 'PQCLEAN_SKIP_TESTS' in os.environ:
  163. if testname.lower() in os.environ['PQCLEAN_SKIP_TESTS'].lower().split(','):
  164. return False
  165. if isinstance(thing, pqclean.Implementation):
  166. scheme = thing.scheme
  167. elif isinstance(thing, pqclean.Scheme):
  168. scheme = thing
  169. else:
  170. return True
  171. if 'PQCLEAN_ONLY_TYPES' in os.environ:
  172. if not(scheme.type.lower() in os.environ['PQCLEAN_ONLY_TYPES'].lower().split(',')):
  173. return False
  174. if 'PQCLEAN_SKIP_TYPES' in os.environ:
  175. if scheme.type.lower() in os.environ['PQCLEAN_SKIP_TYPES'].lower().split(','):
  176. return False
  177. if 'PQCLEAN_ONLY_SCHEMES' in os.environ:
  178. if not(scheme.name.lower() in os.environ['PQCLEAN_ONLY_SCHEMES'].lower().split(',')):
  179. return False
  180. if 'PQCLEAN_SKIP_SCHEMES' in os.environ:
  181. if scheme.name.lower() in os.environ['PQCLEAN_SKIP_SCHEMES'].lower().split(','):
  182. return False
  183. if 'PQCLEAN_ONLY_DIFF' in os.environ:
  184. if shutil.which('git') is not None:
  185. # if we're on a non-master branch, and the only changes are in schemes,
  186. # only run tests on those schemes
  187. branch_result = subprocess.run(
  188. ['git', 'status', '--porcelain=2', '--branch'],
  189. stdout=subprocess.PIPE,
  190. stderr=subprocess.STDOUT,
  191. cwd="..",
  192. )
  193. # ensure we're in a working directory
  194. if branch_result.returncode != 0:
  195. return True
  196. # ensure we're not on master branch
  197. for branch_line in branch_result.stdout.decode('utf-8').splitlines():
  198. tokens = branch_line.split(' ')
  199. if tokens[0] == '#' and tokens[1] == 'branch.head':
  200. if tokens[2] == 'master':
  201. return True
  202. # where are there changes?
  203. diff_result = subprocess.run(
  204. ['git', 'diff', '--name-only', 'origin/master'],
  205. stdout=subprocess.PIPE,
  206. stderr=subprocess.STDOUT
  207. )
  208. assert diff_result.returncode == 0, \
  209. "Got unexpected return code {}".format(diff_result.returncode)
  210. for diff_line in diff_result.stdout.decode('utf-8').splitlines():
  211. # Git still returns UNIX-style paths on Windows, normalize
  212. diff_line = os.path.normpath(diff_line)
  213. # don't skip test if there are any changes outside schemes
  214. if (not diff_line.startswith('crypto_kem') and
  215. not diff_line.startswith('crypto_sign') and
  216. not diff_line.startswith(os.path.join('test', 'duplicate_consistency'))):
  217. logging.info("Running all tests as there are changes "
  218. "outside of schemes")
  219. return True
  220. # do test if the scheme in question has been changed
  221. if diff_line.startswith(thing.path(base='')):
  222. return True
  223. # do test if the scheme's duplicate_consistency files have been changed
  224. if diff_line.startswith(os.path.join('test', 'duplicate_consistency', scheme.name.lower())):
  225. return True
  226. # there were no changes outside schemes, and the scheme in question had no diffs
  227. return False
  228. return True
  229. def filtered_test(func):
  230. funcname = func.__name__[len("test_"):]
  231. @functools.wraps(func)
  232. def wrapper(*args, **kwargs):
  233. if permit_test(funcname, *args, **kwargs):
  234. return func(*args, **kwargs)
  235. else:
  236. raise unittest.SkipTest("Test disabled by filter")
  237. return wrapper
  238. __CPUINFO = None
  239. def get_cpu_info():
  240. global __CPUINFO
  241. while __CPUINFO is None or 'flags' not in __CPUINFO:
  242. import cpuinfo
  243. __CPUINFO = cpuinfo.get_cpu_info()
  244. # CPUINFO is unreliable on Travis CI Macs
  245. if 'CI' in os.environ and sys.platform == 'darwin':
  246. __CPUINFO['flags'] = [
  247. 'aes', 'apic', 'avx1.0', 'clfsh', 'cmov', 'cx16', 'cx8', 'de',
  248. 'em64t', 'erms', 'f16c', 'fpu', 'fxsr', 'lahf', 'mca', 'mce',
  249. 'mmx', 'mon', 'msr', 'mtrr', 'osxsave', 'pae', 'pat', 'pcid',
  250. 'pclmulqdq', 'pge', 'popcnt', 'pse', 'pse36', 'rdrand',
  251. 'rdtscp', 'rdwrfsgs', 'sep', 'smep', 'ss', 'sse', 'sse2',
  252. 'sse3', 'sse4.1', 'sse4.2', 'ssse3', 'syscall', 'tsc',
  253. 'tsc_thread_offset', 'tsci', 'tsctmr', 'vme', 'vmm', 'x2apic',
  254. 'xd', 'xsave']
  255. return __CPUINFO