You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

275 lines
9.5 KiB

  1. import atexit
  2. import functools
  3. import logging
  4. import os
  5. import shutil
  6. import subprocess
  7. import sys
  8. import tempfile
  9. import unittest
  10. import pqclean
  11. @atexit.register
  12. def cleanup_testcases():
  13. """Clean up any remaining isolated test dirs"""
  14. print("Cleaning up testcases directory",
  15. file=sys.stderr)
  16. for dir_ in TEST_TEMPDIRS:
  17. shutil.rmtree(dir_, ignore_errors=True)
  18. TEST_TEMPDIRS = []
  19. def isolate_test_files(impl_path, test_prefix,
  20. dir=os.path.join('..', 'testcases')):
  21. """Isolates the test files in a separate directory, to help parallelise.
  22. Especially Windows is problematic and needs isolation of all test files:
  23. its build process will create .obj files EVERYWHERE.
  24. """
  25. try:
  26. os.mkdir(dir)
  27. except FileExistsError:
  28. pass
  29. test_dir = tempfile.mkdtemp(prefix=test_prefix, dir=dir)
  30. test_dir = os.path.abspath(test_dir)
  31. TEST_TEMPDIRS.append(test_dir)
  32. # Create layers in folder structure
  33. nested_dir = os.path.join(test_dir, 'crypto_bla')
  34. os.mkdir(nested_dir)
  35. nested_dir = os.path.join(nested_dir, 'scheme')
  36. os.mkdir(nested_dir)
  37. # Create test dependencies structure
  38. os.mkdir(os.path.join(test_dir, 'test'))
  39. # the implementation will go here.
  40. new_impl_dir = os.path.abspath(os.path.join(nested_dir, 'impl'))
  41. def initializer():
  42. """Isolate the files to be tested"""
  43. # Copy common files (randombytes.c, aes.c, ...)
  44. shutil.copytree(
  45. os.path.join('..', 'common'), os.path.join(test_dir, 'common'))
  46. # Copy makefiles
  47. shutil.copy(os.path.join('..', 'test', 'Makefile'),
  48. os.path.join(test_dir, 'test', 'Makefile'))
  49. shutil.copy(os.path.join('..', 'test', 'Makefile.Microsoft_nmake'),
  50. os.path.join(test_dir, 'test', 'Makefile.Microsoft_nmake'))
  51. # Copy directories with support files
  52. for d in ['common', 'test_common', 'crypto_sign', 'crypto_kem']:
  53. shutil.copytree(
  54. os.path.join('..', 'test', d),
  55. os.path.join(test_dir, 'test', d)
  56. )
  57. shutil.copytree(impl_path, new_impl_dir)
  58. def destructor():
  59. """Clean up the isolated files"""
  60. shutil.rmtree(test_dir)
  61. return (test_dir, new_impl_dir, initializer, destructor)
  62. def run_subprocess(command, working_dir='.', env=None, expected_returncode=0):
  63. """
  64. Helper function to run a shell command and report success/failure
  65. depending on the exit status of the shell command.
  66. """
  67. if env is not None:
  68. env_ = os.environ.copy()
  69. env_.update(env)
  70. env = env_
  71. # Note we need to capture stdout/stderr from the subprocess,
  72. # then print it, which the unittest will then capture and
  73. # buffer appropriately
  74. print(working_dir + " > " + " ".join(command))
  75. result = subprocess.run(
  76. command,
  77. stdout=subprocess.PIPE,
  78. stderr=subprocess.STDOUT,
  79. cwd=working_dir,
  80. env=env,
  81. )
  82. print(result.stdout.decode('utf-8'))
  83. if expected_returncode is not None:
  84. assert result.returncode == expected_returncode, \
  85. "Got unexpected return code {}".format(result.returncode)
  86. else:
  87. return (result.returncode, result.stdout.decode('utf-8'))
  88. return result.stdout.decode('utf-8')
  89. def make(*args, working_dir='.', env=None, expected_returncode=0, **kwargs):
  90. """
  91. Runs a make target in the specified working directory
  92. Usage:
  93. make('clean', 'targetb', SCHEME='bla')
  94. """
  95. if os.name == 'nt':
  96. make_command = ['nmake', '/f', 'Makefile.Microsoft_nmake',
  97. '/NOLOGO', '/E']
  98. # we need SCHEME_UPPERCASE and IMPLEMENTATION_UPPERCASE with nmake
  99. for envvar in ['IMPLEMENTATION', 'SCHEME']:
  100. if envvar in kwargs:
  101. kwargs['{}_UPPERCASE'.format(envvar)] = (
  102. kwargs[envvar].upper().replace('-', ''))
  103. else:
  104. make_command = ['make']
  105. return run_subprocess(
  106. [
  107. *make_command,
  108. *['{}={}'.format(k, v) for k, v in kwargs.items()],
  109. *args,
  110. ],
  111. working_dir=working_dir,
  112. env=env,
  113. expected_returncode=expected_returncode,
  114. )
  115. def skip_windows(message="This test is not supported on Windows"):
  116. def wrapper(f):
  117. @functools.wraps(f)
  118. def skip_windows(*args, **kwargs):
  119. raise unittest.SkipTest(message)
  120. if os.name == 'nt':
  121. return skip_windows
  122. else:
  123. return f
  124. return wrapper
  125. def slow_test(f):
  126. @functools.wraps(f)
  127. def wrapper(*args, **kwargs):
  128. if ('CI' in os.environ and 'RUN_SLOW' not in os.environ and
  129. os.environ.get('TRAVIS_EVENT_TYPE') != 'cron'):
  130. raise unittest.SkipTest("Slow test skipped on CI run")
  131. return f(*args, **kwargs)
  132. return wrapper
  133. def ensure_available(executable):
  134. """
  135. Checks if a command is available.
  136. If a command MUST be available, because we are in a CI environment,
  137. raises an AssertionError.
  138. In the docker containers, on Travis and on Windows, CI=true is set.
  139. """
  140. path = shutil.which(executable)
  141. if path:
  142. return path
  143. # Installing clang-tidy on LLVM will be too much of a mess.
  144. if ((executable == 'clang-tidy' and sys.platform == 'darwin')
  145. or 'CI' not in os.environ):
  146. raise unittest.SkipTest(
  147. "{} is not available on PATH. Install it to run this test.{}"
  148. .format(executable, "" if not os.name == 'nt'
  149. else "On Windows, make sure to add it to PATH")
  150. )
  151. raise AssertionError("{} not available on CI".format(executable))
  152. def permit_test(testname, *args, **kwargs):
  153. if len(args) == 0:
  154. thing = list(kwargs.values())[0]
  155. else:
  156. thing = args[0]
  157. if 'PQCLEAN_ONLY_TESTS' in os.environ:
  158. if not(testname.lower() in os.environ['PQCLEAN_ONLY_TESTS'].lower().split(',')):
  159. return False
  160. if 'PQCLEAN_SKIP_TESTS' in os.environ:
  161. if testname.lower() in os.environ['PQCLEAN_SKIP_TESTS'].lower().split(','):
  162. return False
  163. if isinstance(thing, pqclean.Implementation):
  164. scheme = thing.scheme
  165. elif isinstance(thing, pqclean.Scheme):
  166. scheme = thing
  167. else:
  168. return True
  169. if 'PQCLEAN_ONLY_TYPES' in os.environ:
  170. if not(scheme.type.lower() in os.environ['PQCLEAN_ONLY_TYPES'].lower().split(',')):
  171. return False
  172. if 'PQCLEAN_SKIP_TYPES' in os.environ:
  173. if scheme.type.lower() in os.environ['PQCLEAN_SKIP_TYPES'].lower().split(','):
  174. return False
  175. if 'PQCLEAN_ONLY_SCHEMES' in os.environ:
  176. if not(scheme.name.lower() in os.environ['PQCLEAN_ONLY_SCHEMES'].lower().split(',')):
  177. return False
  178. if 'PQCLEAN_SKIP_SCHEMES' in os.environ:
  179. if scheme.name.lower() in os.environ['PQCLEAN_SKIP_SCHEMES'].lower().split(','):
  180. return False
  181. if 'PQCLEAN_ONLY_DIFF' in os.environ:
  182. if shutil.which('git') is not None:
  183. # if we're on a non-master branch, and the only changes are in schemes,
  184. # only run tests on those schemes
  185. branch_result = subprocess.run(
  186. ['git', 'status', '--porcelain=2', '--branch'],
  187. stdout=subprocess.PIPE,
  188. stderr=subprocess.STDOUT,
  189. cwd="..",
  190. )
  191. # ensure we're in a working directory
  192. if branch_result.returncode != 0:
  193. return True
  194. # ensure we're not on master branch
  195. for branch_line in branch_result.stdout.decode('utf-8').splitlines():
  196. tokens = branch_line.split(' ')
  197. if tokens[0] == '#' and tokens[1] == 'branch.head':
  198. if tokens[2] == 'master':
  199. return True
  200. # where are there changes?
  201. diff_result = subprocess.run(
  202. ['git', 'diff', '--name-only', 'origin/master'],
  203. stdout=subprocess.PIPE,
  204. stderr=subprocess.STDOUT
  205. )
  206. assert diff_result.returncode == 0, \
  207. "Got unexpected return code {}".format(diff_result.returncode)
  208. for diff_line in diff_result.stdout.decode('utf-8').splitlines():
  209. # don't skip test if there are any changes outside schemes
  210. if (not diff_line.startswith('crypto_kem') and
  211. not diff_line.startswith('crypto_sign') and
  212. not diff_line.startswith(os.path.join('test', 'duplicate_consistency'))):
  213. logging.info("Running all tests as there are changes "
  214. "outside of schemes")
  215. return True
  216. # do test if the scheme in question has been changed
  217. if diff_line.startswith(thing.path(base='')):
  218. return True
  219. # do test if the scheme's duplicate_consistency files have been changed
  220. if diff_line.startswith(os.path.join('test', 'duplicate_consistency', scheme.name.lower())):
  221. return True
  222. # there were no changes outside schemes, and the scheme in question had no diffs
  223. return False
  224. return True
  225. def filtered_test(func):
  226. funcname = func.__name__[len("test_"):]
  227. @functools.wraps(func)
  228. def wrapper(*args, **kwargs):
  229. if permit_test(funcname, *args, **kwargs):
  230. return func(*args, **kwargs)
  231. else:
  232. raise unittest.SkipTest("Test disabled by filter")
  233. return wrapper