#!/usr/bin/env python3
#
# units.py - Units test harness for ctags
#
# Copyright (C) 2019 Ken Takata
# (Based on "units" written by Masatake YAMATO.)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# Python 3.5 or later is required.
# On Windows, unix-like shell (e.g. bash) and some unix tools (sed,
# diff, etc.) are needed.
#
import time # for debugging
import argparse
import filecmp
import glob
import io
import os
import platform
import queue
import re
import shutil
import stat
import subprocess
import sys
import threading
#
# Global Parameters
#
SHELL = '/bin/sh'
CTAGS = './ctags'
READTAGS = './readtags'
OPTSCRIPT = './optscript'
WITH_TIMEOUT = 0
WITH_VALGRIND = False
COLORIZED_OUTPUT = True
CATEGORIES = []
UNITS = []
LANGUAGES = []
PRETENSE_OPTS = ''
RUN_SHRINK = False
SHOW_DIFF_OUTPUT = False
NUM_WORKER_THREADS = 4
DIFF_U_NUM = 0
#
# Internal variables and constants
#
_FEATURE_LIST = []
_PREPERE_ENV = ''
_DEFAULT_CATEGORY = 'ROOT'
_TIMEOUT_EXIT = 124
_VG_TIMEOUT_FACTOR = 10
_VALGRIND_EXIT = 58
_STDERR_OUTPUT_NAME = 'STDERR.tmp'
_DIFF_OUTPUT_NAME = 'DIFF.tmp'
_VALGRIND_OUTPUT_NAME = 'VALGRIND.tmp'
#
# Results
#
L_PASSED = []
L_FIXED = []
L_FAILED_BY_STATUS = []
L_FAILED_BY_DIFF = []
L_SKIPPED_BY_FEATURES = []
L_SKIPPED_BY_LANGUAGES = []
L_SKIPPED_BY_ILOOP = []
L_KNOWN_BUGS = []
L_FAILED_BY_TIMEED_OUT = []
L_BROKEN_ARGS_CTAGS = []
L_VALGRIND = []
TMAIN_STATUS = True
TMAIN_FAILED = []
def remove_prefix(string, prefix):
if string.startswith(prefix):
return string[len(prefix):]
else:
return string
def is_cygwin():
system = platform.system()
return system.startswith('CYGWIN_NT') or system.startswith('MINGW32_NT')
def isabs(path):
if is_cygwin():
import ntpath
if ntpath.isabs(path):
return True
return os.path.isabs(path)
def action_help(parser, action, *args):
parser.print_help()
return 0
def error_exit(status, msg):
print(msg, file=sys.stderr)
sys.exit(status)
def line(*args, file=sys.stdout):
if len(args) > 0:
ch = args[0]
else:
ch = '-'
print(ch * 60, file=file)
def remove_readonly(func, path, _):
# Clear the readonly bit and reattempt the removal
os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
dname = os.path.dirname(path)
os.chmod(dname, os.stat(dname).st_mode | stat.S_IWRITE)
func(path)
def clean_bundles(bundles):
if not os.path.isfile(bundles):
return
with open(bundles, 'r') as f:
for fn in f.read().splitlines():
if os.path.isdir(fn):
shutil.rmtree(fn, onerror=remove_readonly)
elif os.path.isfile(fn):
os.remove(fn)
os.remove(bundles)
def clean_tcase(d, bundles):
if os.path.isdir(d):
clean_bundles(bundles)
for fn in glob.glob(d + '/*.tmp'):
os.remove(fn)
for fn in glob.glob(d + '/*.TMP'):
os.remove(fn)
def check_availability(cmd):
if not shutil.which(cmd):
error_exit(1, cmd + ' command is not available')
def check_units(name, category):
if len(UNITS) == 0:
return True
for u in UNITS:
ret = re.match(r'(.+)/(.+)', u)
if ret:
if ret.group(1, 2) == (category, name):
return True
elif u == name:
return True
return False
def init_features():
global _FEATURE_LIST
ret = subprocess.run([CTAGS, '--quiet', '--options=NONE', '--list-features', '--with-list=no'],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
_FEATURE_LIST = re.sub(r'(?m)^([^ ]+).*$', r'\1',
ret.stdout.decode('utf-8')).splitlines()
def check_features(feature, ffile):
features = []
if feature:
features = [feature]
elif os.path.isfile(ffile):
with open(ffile, 'r') as f:
features = f.read().splitlines()
for expected in features:
if expected == '':
continue
found = False
found_unexpectedly = False
if expected[0] == '!':
if expected[1:] in _FEATURE_LIST:
found_unexpectedly = True
else:
if expected in _FEATURE_LIST:
found = True
if found_unexpectedly:
return (False, expected)
elif not found:
return (False, expected)
return (True, '')
def check_languages(cmdline, lfile):
if not os.path.isfile(lfile):
return (True, '')
ret = subprocess.run(cmdline + ['--list-languages'],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
langs = ret.stdout.decode('utf-8').splitlines()
with open(lfile, 'r') as f:
for expected in f.read().splitlines():
found = False
if expected in langs:
found = True
if not found:
return (False, expected)
return (True, '')
def decorate(decorator, msg, colorized):
if decorator == 'red':
num = '31'
elif decorator == 'green':
num = '32'
elif decorator == 'yellow':
num = '33'
else:
error_exit(1, 'INTERNAL ERROR: wrong run_result function')
if colorized:
return "\x1b[" + num + 'm' + msg + "\x1b[m"
else:
return msg
def run_result(result_type, msg, output, *args, file=sys.stdout):
func_dict = {
'skip': run_result_skip,
'error': run_result_error,
'ok': run_result_ok,
'known_error': run_result_known_error,
}
func_dict[result_type](msg, file, COLORIZED_OUTPUT, *args)
file.flush()
if output:
with open(output, 'w') as f:
func_dict[result_type](msg, f, False, *args)
def run_result_skip(msg, f, colorized, *args):
s = msg + decorate('yellow', 'skipped', colorized)
if len(args) > 0:
s += ' (' + args[0] + ')'
print(s, file=f)
def run_result_error(msg, f, colorized, *args):
s = msg + decorate('red', 'failed', colorized)
if len(args) > 0:
s += ' (' + args[0] + ')'
print(s, file=f)
def run_result_ok(msg, f, colorized, *args):
s = msg + decorate('green', 'passed', colorized)
if len(args) > 0:
s += ' (' + args[0] + ')'
print(s, file=f)
def run_result_known_error(msg, f, colorized, *args):
s = msg + decorate('yellow', 'failed', colorized) + ' (KNOWN bug)'
print(s, file=f)
def run_shrink(cmdline_template, finput, foutput, lang):
script = sys.argv[0]
script = os.path.splitext(script)[0] # remove '.py'
print('Shrinking ' + finput + ' as ' + lang)
# fallback to the shell script version
subprocess.run([SHELL, script, 'shrink',
'--timeout=1', '--foreground',
cmdline_template, finput, foutput])
# return a filter for normalizing the basename
#
# If internal is True, return a pair of [pattern, replacement],
# otherwise return a list of command line arguments.
def basename_filter(internal, output_type):
filters_external = {
'ctags': 's%\(^[^\t]\{1,\}\t\)\(/\{0,1\}\([^/\t]\{1,\}/\)*\)%\\1%',
# "input" in the expresion is for finding input file names in the TAGS file.
# RAWOUT.tmp:
#
# ./Units/parser-ada.r/ada-etags-suffix.d/input_0.adb,238
# package body Input_0 is ^?Input_0/b^A1,0
#
# With the original expression, both "./Units/parser-ada.r/ada-etags-suffix.d/"
# and "package body Input_0 is Input_0/' are deleted.
# FILTERED.tmp:
#
# input_0.adb,238
# b^A1,0
#
# Adding "input" ot the expression is for deleting only the former one and for
# skpping the later one.
#
# FIXME: if "input" is included as a substring of tag entry names, filtering
# with this expression makes the test fail.
'etags': 's%.*\/\(input[-._][[:print:]]\{1,\}\),\([0-9]\{1,\}$\)%\\1,\\2%',
'xref': 's%\(.*[[:digit:]]\{1,\} \)\([^ ]\{1,\}[^ ]\{1,\}\)/\([^ ].\{1,\}.\{1,\}$\)%\\1\\3%',
'json': 's%\("path": \)"[^"]\{1,\}/\([^/"]\{1,\}\)"%\\1"\\2"%',
}
filters_internal = {
'ctags': [r'(^[^\t]+\t)(/?([^/\t]+/)*)', r'\1'],
# See above comments about "input".
'etags': [r'.*/(input[-._]\S+),([0-9]+$)', r'\1,\2'],
'xref': [r'(.*\d+ )([^ ]+[^ ]+)/([^ ].+.+$)', r'\1\3'],
'json': [r'("path": )"[^"]+/([^/"]+)"', r'\1"\2"'],
}
if internal:
return filters_internal[output_type]
else:
return ['sed', '-e', filters_external[output_type]]
# convert a command line list to a command line string
def join_cmdline(cmdline):
# surround with '' if an argument includes spaces or '\'
# TODO: use more robust way
return ' '.join("'" + x + "'" if (' ' in x) or ('\\' in x) else x
for x in cmdline)
def run_record_cmdline(cmdline, ffilter, ocmdline, output_type):
with open(ocmdline, 'w') as f:
print("%s\n%s \\\n| %s \\\n| %s\n" % (
_PREPERE_ENV,
join_cmdline(cmdline),
join_cmdline(basename_filter(False, output_type)),
ffilter), file=f)
def prepare_bundles(frm, to, obundles):
for src in glob.glob(frm + '/*'):
fn = os.path.basename(src)
if fn.startswith('input.'):
continue
elif fn.startswith('expected.tags'):
continue
elif fn.startswith('README'):
continue
elif fn in ['features', 'languages', 'filters']:
continue
elif fn == 'args.ctags':
continue
else:
dist = to + '/' + fn
if os.path.isdir(src):
shutil.copytree(src, dist, copy_function=shutil.copyfile)
else:
shutil.copyfile(src, dist)
with open(obundles, 'a') as f:
print(dist, file=f)
def anon_normalize_sub(internal, ctags, input_actual, *args):
# TODO: "Units" should not be hardcoded.
input_expected = './Units' + re.sub(r'^.*?/Units', r'', input_actual, 1)
ret = subprocess.run([CTAGS, '--quiet', '--options=NONE', '--_anonhash=' + input_actual],
stdout=subprocess.PIPE)
actual = ret.stdout.decode('utf-8').splitlines()[0]
ret = subprocess.run([CTAGS, '--quiet', '--options=NONE', '--_anonhash=' + input_expected],
stdout=subprocess.PIPE)
expected = ret.stdout.decode('utf-8').splitlines()[0]
if internal:
retlist = [[actual, expected]]
else:
retlist = ['-e', 's/' + actual + '/' + expected + '/g']
if len(args) > 0:
return retlist + anon_normalize_sub(internal, ctags, *args)
else:
return retlist
def is_anon_normalize_needed(rawout):
with open(rawout, 'r', errors='ignore') as f:
if re.search(r'[0-9a-f]{8}', f.read()):
return True
return False
# return a list of filters for normalizing anonhash
#
# If internal is True, return a list of pairs of [pattern, replacement],
# otherwise return a list of command line arguments.
def anon_normalize(internal, rawout, ctags, input_actual, *args):
if is_anon_normalize_needed(rawout):
return anon_normalize_sub(internal, ctags, input_actual, *args)
else:
return []
def run_filter(finput, foutput, base_filter, anon_filters):
pat1 = [re.compile(base_filter[0]), base_filter[1]]
pat2 = [(re.compile(p[0]), p[1]) for p in anon_filters]
with open(finput, 'r', errors='surrogateescape') as fin, \
open(foutput, 'w', errors='surrogateescape', newline='\n') as fout:
for l in fin:
l = pat1[0].sub(pat1[1], l, 1)
for p in pat2:
l = p[0].sub(p[1], l)
print(l, end='', file=fout)
def guess_lang(cmdline, finput):
ret = subprocess.run(cmdline + ['--print-language', finput],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
return re.sub(r'^.*: ', r'',
ret.stdout.decode('utf-8').replace("\r\n", "\n").replace("\n", ''))
def guess_lang_from_log(log):
with open(log, 'r', encoding='utf-8', errors='ignore') as f:
for l in f:
ret = re.match('OPENING.* as (.*) language .*file ', l)
if ret:
return ret.group(1)
return ''
def run_tcase(finput, t, name, tclass, category, build_t, extra_inputs):
global L_PASSED
global L_FIXED
global L_FAILED_BY_STATUS
global L_FAILED_BY_DIFF
global L_SKIPPED_BY_FEATURES
global L_SKIPPED_BY_LANGUAGES
global L_SKIPPED_BY_ILOOP
global L_KNOWN_BUGS
global L_FAILED_BY_TIMEED_OUT
global L_BROKEN_ARGS_CTAGS
global L_VALGRIND
o = build_t
fargs = t + '/args.ctags'
ffeatures = t + '/features'
flanguages = t + '/languages'
ffilter = t + '/filter'
fexpected = t + '/expected.tags'
output_type = 'ctags'
output_label = ''
output_tflag = []
output_feature = ''
output_lang_extras = ''
if os.path.isfile(fexpected):
pass
elif os.path.isfile(t + '/expected.tags-e'):
fexpected = t + '/expected.tags-e'
output_type = 'etags'
output_label = '/' + output_type
output_tflag = ['-e', '--tag-relative=no']
elif os.path.isfile(t + '/expected.tags-x'):
fexpected = t + '/expected.tags-x'
output_type = 'xref'
output_label = '/' + output_type
output_tflag = ['-x']
elif os.path.isfile(t + '/expected.tags-json'):
fexpected = t + '/expected.tags-json'
output_type = 'json'
output_label = '/' + output_type
output_tflag = ['--output-format=json']
output_feature = 'json'
if len(extra_inputs) > 0:
output_lang_extras = ' (multi inputs)'
if not shutil.which(ffilter):
ffilter = 'cat'
ostderr = o + '/' + _STDERR_OUTPUT_NAME
orawout = o + '/RAWOUT.tmp'
ofiltered = o + '/FILTERED.tmp'
odiff = o + '/' + _DIFF_OUTPUT_NAME
ocmdline = o + '/CMDLINE.tmp'
ovalgrind = o + '/' + _VALGRIND_OUTPUT_NAME
oresult = o + '/RESULT.tmp'
oshrink_template = o + '/SHRINK-%s.tmp'
obundles = o + '/BUNDLES'
broken_args_ctags = False
#
# Filtered by UNIT
#
if not check_units(name, category):
return False
#
# Build cmdline
#
cmdline = [CTAGS, '--verbose', '--options=NONE', '--fields=-T']
if PRETENSE_OPTS != '':
cmdline += [PRETENSE_OPTS]
cmdline += ['--optlib-dir=+' + t + '/optlib', '-o', '-']
if os.path.isfile(fargs):
cmdline += ['--options=' + fargs]
ret = subprocess.run(cmdline + ['--_force-quit=0'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if ret.returncode != 0:
broken_args_ctags = True
#
# make a backup (basedcmdline) of cmdline. basedcmdline is used
# as a command line template for running shrinker. basedcmdline
# should not include the name of the input file name. The
# shrinker makes a another cmdline by applying a real input file
# name to the template. On the other hand, cmdline is
# destructively updated by appending input file name in this
# function. The file name should not be included in the cmdline
# template.
#
# To avoid the updating in this function propagating to
# basecmdline, we copy the cmdline here.
#
basecmdline = cmdline[:]
#
# Filtered by LANGUAGES
#
guessed_lang = None
if len(LANGUAGES) > 0:
guessed_lang = guess_lang(basecmdline, finput)
if not guessed_lang in LANGUAGES:
return False
clean_tcase(o, obundles)
os.makedirs(o, exist_ok=True)
if not os.path.samefile(o, t):
prepare_bundles(t, o, obundles)
# helper function for building some strings based on guessed_lang
def build_strings(guessed_lang):
return ('%-59s ' % ('Testing ' + name + ' as ' + guessed_lang + output_lang_extras + output_label),
join_cmdline(basecmdline) + ' --language-force=' + guessed_lang + ' %s > /dev/null 2>&1',
oshrink_template % (guessed_lang.replace('/', '-')))
(tmp, feat) = check_features(output_feature, ffeatures)
if not tmp:
if not guessed_lang:
guessed_lang = guess_lang(basecmdline, finput)
msg = build_strings(guessed_lang)[0]
L_SKIPPED_BY_FEATURES += [category + '/' + name]
if feat.startswith('!'):
run_result('skip', msg, oresult, 'unwanted feature "' + feat[1:] + '" is available')
else:
run_result('skip', msg, oresult, 'required feature "' + feat + '" is not available')
return False
(tmp, lang) = check_languages(basecmdline, flanguages)
if not tmp:
if not guessed_lang:
guessed_lang = guess_lang(basecmdline, finput)
msg = build_strings(guessed_lang)[0]
L_SKIPPED_BY_LANGUAGES += [category + '/' + name]
run_result('skip', msg, oresult, 'required language parser "' + lang + '" is not available')
return False
if WITH_TIMEOUT == 0 and tclass == 'i':
if not guessed_lang:
guessed_lang = guess_lang(basecmdline, finput)
msg = build_strings(guessed_lang)[0]
L_SKIPPED_BY_ILOOP += [category + '/' + name]
run_result('skip', msg, oresult, 'may cause an infinite loop')
return False
if broken_args_ctags:
if not guessed_lang:
guessed_lang = guess_lang(basecmdline, finput)
msg = build_strings(guessed_lang)[0]
L_BROKEN_ARGS_CTAGS += [category + '/' + name]
run_result('error', msg, None, 'broken args.ctags?')
return False
cmdline += output_tflag + [finput]
if len(extra_inputs) > 0:
cmdline += extra_inputs
timeout_value = WITH_TIMEOUT
if WITH_VALGRIND:
cmdline = ['valgrind', '--leak-check=full', '--track-origins=yes',
'--error-exitcode=' + str(_VALGRIND_EXIT), '--log-file=' + ovalgrind] + cmdline
timeout_value *= _VG_TIMEOUT_FACTOR
if timeout_value == 0:
timeout_value = None
start = time.time()
try:
with open(orawout, 'wb') as fo, \
open(ostderr, 'wb') as fe:
ret = subprocess.run(cmdline, stdout=fo, stderr=fe,
timeout=timeout_value)
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
except subprocess.TimeoutExpired:
if not guessed_lang:
guessed_lang = guess_lang(basecmdline, finput)
(msg, cmdline_template, oshrink) = build_strings(guessed_lang)
L_FAILED_BY_TIMEED_OUT += [category + '/' + name]
run_result('error', msg, oresult, 'TIMED OUT')
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
if RUN_SHRINK and len(extra_inputs) == 0:
run_shrink(cmdline_template, finput, oshrink, guessed_lang)
return False
#print('execute time: %f' % (time.time() - start))
guessed_lang = guess_lang_from_log(ostderr)
(msg, cmdline_template, oshrink) = build_strings(guessed_lang)
if ret.returncode != 0:
if WITH_VALGRIND and ret.returncode == _VALGRIND_EXIT and \
tclass != 'v':
L_VALGRIND += [category + '/' + name]
run_result('error', msg, oresult, 'valgrind-error')
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
return False
elif tclass == 'b':
L_KNOWN_BUGS += [category + '/' + name]
run_result('known_error', msg, oresult)
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
if RUN_SHRINK and len(extra_inputs) == 0:
run_shrink(cmdline_template, finput, oshrink, guessed_lang)
return True
else:
L_FAILED_BY_STATUS += [category + '/' + name]
run_result('error', msg, oresult, 'unexpected exit status: ' + str(ret.returncode))
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
if RUN_SHRINK and len(extra_inputs) == 0:
run_shrink(cmdline_template, finput, oshrink, guessed_lang)
return False
elif WITH_VALGRIND and tclass == 'v':
L_FIXED += [category + '/' + name]
if not os.path.isfile(fexpected):
clean_tcase(o, obundles)
if tclass == 'b':
L_FIXED += [category + '/' + name]
elif tclass == 'i':
L_FIXED += [category + '/' + name]
L_PASSED += [category + '/' + name]
run_result('ok', msg, None, '"expected.tags*" not found')
return True
start = time.time()
if ffilter != 'cat':
# Use external filter
filter_cmd = basename_filter(False, output_type) + \
anon_normalize(False, orawout, CTAGS, finput, *extra_inputs) + \
['<', orawout]
filter_cmd += ['|', ffilter]
filter_cmd += ['>', ofiltered]
#print(filter_cmd)
subprocess.run([SHELL, '-c', join_cmdline(filter_cmd)])
else:
# Use internal filter
run_filter(orawout, ofiltered, basename_filter(True, output_type),
anon_normalize(True, orawout, CTAGS, finput, *extra_inputs))
#print('filter time: %f' % (time.time() - start))
start = time.time()
if filecmp.cmp(fexpected, ofiltered):
ret.returncode = 0
else:
with open(odiff, 'wb') as f:
ret = subprocess.run(['diff', '-U', str(DIFF_U_NUM),
'-I', '^!_TAG', '--strip-trailing-cr', fexpected, ofiltered],
stdout=f)
#print('diff time: %f' % (time.time() - start))
if ret.returncode == 0:
clean_tcase(o, obundles)
if tclass == 'b':
L_FIXED += [category + '/' + name]
elif WITH_TIMEOUT != 0 and tclass == 'i':
L_FIXED += [category + '/' + name]
L_PASSED += [category + '/' + name]
run_result('ok', msg, None)
return True
else:
if tclass == 'b':
L_KNOWN_BUGS += [category + '/' + name]
run_result('known_error', msg, oresult)
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
return True
else:
L_FAILED_BY_DIFF += [category + '/' + name]
run_result('error', msg, oresult, 'unexpected output')
run_record_cmdline(cmdline, ffilter, ocmdline, output_type)
return False
def create_thread_queue(func):
q = queue.Queue()
threads = []
for i in range(NUM_WORKER_THREADS):
t = threading.Thread(target=worker, args=(func, q), daemon=True)
t.start()
threads.append(t)
return (q, threads)
def worker(func, q):
while True:
item = q.get()
if item is None:
break
try:
func(*item)
except:
import traceback
traceback.print_exc()
q.task_done()
def join_workers(q, threads):
# block until all tasks are done
try:
q.join()
except KeyboardInterrupt:
# empty the queue
while True:
try:
q.get_nowait()
except queue.Empty:
break
# try to stop workers
for i in range(NUM_WORKER_THREADS):
q.put(None)
for t in threads:
t.join(timeout=2)
# exit regardless that workers are stopped
sys.exit(1)
# stop workers
for i in range(NUM_WORKER_THREADS):
q.put(None)
for t in threads:
t.join()
def accepted_file(fname):
# Ignore backup files
return not fname.endswith('~')
def run_dir(category, base_dir, build_base_dir):
#
# Filtered by CATEGORIES
#
if len(CATEGORIES) > 0 and not category in CATEGORIES:
return False
print("\nCategory: " + category)
line()
(q, threads) = create_thread_queue(run_tcase)
for finput in glob.glob(base_dir + '/*.[dbtiv]/input.*'):
finput = finput.replace('\\', '/') # for Windows
if not accepted_file(finput):
continue
dname = os.path.dirname(finput)
extra_inputs = sorted(map(lambda x: x.replace('\\', '/'), # for Windows
filter(accepted_file,
glob.glob(dname + '/input[-_][0-9].*') +
glob.glob(dname + '/input[-_][0-9][-_]*.*')
)))
tcase_dir = dname
build_tcase_dir = build_base_dir + remove_prefix(tcase_dir, base_dir)
ret = re.match(r'^.*/(.*)\.([dbtiv])$', tcase_dir)
(name, tclass) = ret.group(1, 2)
q.put((finput, tcase_dir, name, tclass, category, build_tcase_dir, extra_inputs))
join_workers(q, threads)
def run_show_diff_output(units_dir, t):
print("\t", end='')
line('.')
for fn in glob.glob(units_dir + '/' + t + '.*/' + _DIFF_OUTPUT_NAME):
with open(fn, 'r') as f:
for l in f:
print("\t" + l, end='')
print()
def run_show_stderr_output(units_dir, t):
print("\t", end='')
line('.')
for fn in glob.glob(units_dir + '/' + t + '.*/' + _STDERR_OUTPUT_NAME):
with open(fn, 'r') as f:
lines = f.readlines()
for l in lines[-50:]:
print("\t" + l, end='')
print()
def run_show_valgrind_output(units_dir, t):
print("\t", end='')
line('.')
for fn in glob.glob(units_dir + '/' + t + '.*/' + _VALGRIND_OUTPUT_NAME):
with open(fn, 'r') as f:
for l in f:
print("\t" + l, end='')
print()
def run_summary(build_dir):
print()
print('Summary (see CMDLINE.tmp to reproduce without test harness)')
line()
fmt = ' %-40s%d'
print(fmt % ('#passed:', len(L_PASSED)))
print(fmt % ('#FIXED:', len(L_FIXED)))
for t in L_FIXED:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
print(fmt % ('#FAILED (broken args.ctags?):', len(L_BROKEN_ARGS_CTAGS)))
for t in L_BROKEN_ARGS_CTAGS:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
print(fmt % ('#FAILED (unexpected-exit-status):', len(L_FAILED_BY_STATUS)))
for t in L_FAILED_BY_STATUS:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
if SHOW_DIFF_OUTPUT:
run_show_stderr_output(build_dir, remove_prefix(t, _DEFAULT_CATEGORY + '/'))
print(fmt % ('#FAILED (unexpected-output):', len(L_FAILED_BY_DIFF)))
for t in L_FAILED_BY_DIFF:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
if SHOW_DIFF_OUTPUT:
run_show_stderr_output(build_dir, remove_prefix(t, _DEFAULT_CATEGORY + '/'))
run_show_diff_output(build_dir, remove_prefix(t, _DEFAULT_CATEGORY + '/'))
if WITH_TIMEOUT != 0:
print(fmt % ('#TIMED-OUT (' + str(WITH_TIMEOUT) + 's):', len(L_FAILED_BY_TIMEED_OUT)))
for t in L_FAILED_BY_TIMEED_OUT:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
print(fmt % ('#skipped (features):', len(L_SKIPPED_BY_FEATURES)))
for t in L_SKIPPED_BY_FEATURES:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
print(fmt % ('#skipped (languages):', len(L_SKIPPED_BY_LANGUAGES)))
for t in L_SKIPPED_BY_LANGUAGES:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
if WITH_TIMEOUT == 0:
print(fmt % ('#skipped (infinite-loop):', len(L_SKIPPED_BY_ILOOP)))
for t in L_SKIPPED_BY_ILOOP:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
print(fmt % ('#known-bugs:', len(L_KNOWN_BUGS)))
for t in L_KNOWN_BUGS:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
if WITH_VALGRIND:
print(fmt % ('#valgrind-error:', len(L_VALGRIND)))
for t in L_VALGRIND:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
if SHOW_DIFF_OUTPUT:
print(fmt % ('##valgrind-error:', len(L_VALGRIND)))
for t in L_VALGRIND:
print("\t" + remove_prefix(t, _DEFAULT_CATEGORY + '/'))
run_show_valgrind_output(build_dir, remove_prefix(t, _DEFAULT_CATEGORY + '/'))
def make_pretense_map(arg):
r = ''
for p in arg.split(','):
ret = re.match(r'(.*)/(.*)', p)
if not ret:
error_exit(1, 'wrong format of --_pretend option arg')
(newlang, oldlang) = ret.group(1, 2)
if newlang == '':
error_exit(1, 'newlang part of --_pretend option arg is empty')
if oldlang == '':
error_exit(1, 'oldlang part of --_pretend option arg is empty')
r += ' --_pretend-' + newlang + '=' + oldlang
return r
def action_run(parser, action, *args):
global CATEGORIES
global CTAGS
global UNITS
global LANGUAGES
global WITH_TIMEOUT
global WITH_VALGRIND
global COLORIZED_OUTPUT
global RUN_SHRINK
global SHOW_DIFF_OUTPUT
global PRETENSE_OPTS
global NUM_WORKER_THREADS
global SHELL
parser.add_argument('--categories', metavar='CATEGORY1[,CATEGORY2,...]',
help='run only CATEGORY* related cases.')
parser.add_argument('--ctags',
help='ctags executable file for testing')
parser.add_argument('--units', metavar='UNITS1[,UNITS2,...]',
help='run only UNIT(S).')
parser.add_argument('--languages', metavar='PARSER1[,PARSER2,...]',
help='run only PARSER* related cases.')
parser.add_argument('--with-timeout', type=int, default=0,
metavar='DURATION',
help='run a test case with specified timeout in seconds. 0 means no timeout (default).')
parser.add_argument('--with-valgrind', action='store_true', default=False,
help='run a test case under valgrind')
parser.add_argument('--colorized-output', choices=['yes', 'no'], default='yes',
help='print the result in color.')
parser.add_argument('--run-shrink', action='store_true', default=False,
help='(TODO: NOT IMPLEMENTED YET)')
parser.add_argument('--show-diff-output', action='store_true', default=False,
help='show diff output (and valgrind errors) for failed test cases in the summary.')
parser.add_argument('--with-pretense-map',
metavar='NEWLANG0/OLDLANG0[,...]',
help='make NEWLANG parser pretend OLDLANG.')
parser.add_argument('--threads', type=int, default=NUM_WORKER_THREADS,
help='number of worker threads')
parser.add_argument('--shell',
help='shell to be used.')
parser.add_argument('units_dir',
help='Units directory.')
parser.add_argument('build_dir', nargs='?', default='',
help='Build directory. If not given, units_dir is used.')
res = parser.parse_args(args)
if res.categories:
CATEGORIES = [x if x == 'ROOT' or x.endswith('.r') else x + '.r'
for x in res.categories.split(',')]
if res.ctags:
CTAGS = res.ctags
if res.units:
UNITS = res.units.split(',')
if res.languages:
LANGUAGES = res.languages.split(',')
WITH_TIMEOUT = res.with_timeout
WITH_VALGRIND = res.with_valgrind
COLORIZED_OUTPUT = (res.colorized_output == 'yes')
RUN_SHRINK = res.run_shrink
SHOW_DIFF_OUTPUT = res.show_diff_output
if res.with_pretense_map:
PRETENSE_OPTS = make_pretense_map(res.with_pretense_map)
NUM_WORKER_THREADS = res.threads
if res.shell:
SHELL = res.shell
if res.build_dir == '':
res.build_dir = res.units_dir
if WITH_VALGRIND:
check_availability('valgrind')
check_availability('diff')
init_features()
if isabs(res.build_dir):
build_dir = res.build_dir
else:
build_dir = os.path.realpath(res.build_dir)
category = _DEFAULT_CATEGORY
if len(CATEGORIES) == 0 or (category in CATEGORIES):
run_dir(category, res.units_dir, build_dir)
for d in glob.glob(res.units_dir + '/*.r'):
d = d.replace('\\', '/') # for Windows
if not os.path.isdir(d):
continue
category = os.path.basename(d)
build_d = res.build_dir + '/' + category
run_dir(category, d, build_d)
run_summary(build_dir)
if L_FAILED_BY_STATUS or L_FAILED_BY_DIFF or \
L_FAILED_BY_TIMEED_OUT or L_BROKEN_ARGS_CTAGS or \
L_VALGRIND:
return 1
else:
return 0
def action_clean(parser, action, *args):
parser.add_argument('units_dir',
help='Build directory for units testing.')
res = parser.parse_args(args)
units_dir = res.units_dir
if not os.path.isdir(units_dir):
error_exit(0, 'No such directory: ' + units_dir)
for bundles in glob.glob(units_dir + '/**/BUNDLES', recursive=True):
clean_bundles(bundles)
for fn in glob.glob(units_dir + '/**/*.tmp', recursive=True):
os.remove(fn)
for fn in glob.glob(units_dir + '/**/*.TMP', recursive=True):
os.remove(fn)
return 0
def tmain_compare_result(build_topdir):
for fn in glob.glob(build_topdir + '/*/*-diff.txt'):
print(fn)
print()
with open(fn, 'r', errors='replace') as f:
for l in f:
print("\t" + l, end='')
print()
for fn in glob.glob(build_topdir + '/*/gdb-backtrace.txt'):
with open(fn, 'r', errors='replace') as f:
for l in f:
print("\t" + l, end='')
def tmain_compare(subdir, build_subdir, aspect, file):
msg = '%-59s ' % (aspect)
generated = build_subdir + '/' + aspect + '-diff.txt'
actual = build_subdir + '/' + aspect + '-actual.txt'
expected = subdir + '/' + aspect + '-expected.txt'
if os.path.isfile(actual) and os.path.isfile(expected) and \
filecmp.cmp(actual, expected):
run_result('ok', msg, None, file=file)
# When successful, remove files generated in the last
# failure to make the directory clean.
# Unlike other generated files like gdb-backtrace.txt
# misc/review script looks at the -diff.txt file.
# Therefore we handle -diff.txt specially here.
if os.path.isfile(generated):
os.remove(generated)
return True
else:
with open(generated, 'wb') as f:
subprocess.run(['diff', '-U',
str(DIFF_U_NUM), '--strip-trailing-cr',
expected, actual],
stdout=f, stderr=subprocess.STDOUT)
run_result('error', msg, None, 'diff: ' + generated, file=file)
return False
def failed_git_marker(fn):
if shutil.which('git'):
ret = subprocess.run(['git', 'ls-files', '--', fn],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
if ret.returncode == 0 and ret.stdout == b'':
return ''
return ''
def is_crashed(fn):
with open(fn, 'r') as f:
if 'core dump' in f.read():
return True
return False
def print_backtraces(ctags_exe, cores, fn):
with open(fn, 'wb') as f:
for coref in cores:
subprocess.run(['gdb', ctags_exe, '-c', coref, '-ex', 'where', '-batch'],
stdout=f, stderr=subprocess.DEVNULL)
def tmain_sub(test_name, basedir, subdir, build_subdir):
global TMAIN_STATUS
global TMAIN_FAILED
CODE_FOR_IGNORING_THIS_TMAIN_TEST = 77
os.makedirs(build_subdir, exist_ok=True)
for fn in glob.glob(build_subdir + '/*-actual.txt'):
os.remove(fn)
strbuf = io.StringIO()
print("\nTesting " + test_name, file=strbuf)
line('-', file=strbuf)
if isabs(CTAGS):
ctags_path = CTAGS
else:
ctags_path = os.path.join(basedir, CTAGS)
if isabs(READTAGS):
readtags_path = READTAGS
else:
readtags_path = os.path.join(basedir, READTAGS)
if isabs(OPTSCRIPT):
optscript_path = OPTSCRIPT
else:
optscript_path = os.path.join(basedir, OPTSCRIPT)
start = time.time()
ret = subprocess.run([SHELL, 'run.sh',
ctags_path,
build_subdir,
readtags_path,
optscript_path],
cwd=subdir,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#print('execute time: %f' % (time.time() - start), file=strbuf)
encoding = 'utf-8'
try:
stdout = ret.stdout.decode(encoding).replace("\r\n", "\n")
except UnicodeError:
encoding = 'iso-8859-1'
stdout = ret.stdout.decode(encoding).replace("\r\n", "\n")
stderr = ret.stderr.decode('utf-8').replace("\r\n", "\n")
if os.path.basename(CTAGS) != 'ctags':
# program name needs to be canonicalized
stderr = re.sub('(?m)^' + os.path.basename(CTAGS) + ':', 'ctags:', stderr)
if ret.returncode == CODE_FOR_IGNORING_THIS_TMAIN_TEST:
run_result('skip', '', None, stdout.replace("\n", ''), file=strbuf)
print(strbuf.getvalue(), end='')
sys.stdout.flush()
strbuf.close()
return True
with open(build_subdir + '/exit-actual.txt', 'w', newline='\n') as f:
print(ret.returncode, file=f)
with open(build_subdir + '/stdout-actual.txt', 'w', newline='\n', encoding=encoding) as f:
print(stdout, end='', file=f)
with open(build_subdir + '/stderr-actual.txt', 'w', newline='\n') as f:
print(stderr, end='', file=f)
if os.path.isfile(build_subdir + '/tags'):
os.rename(build_subdir + '/tags', build_subdir + '/tags-actual.txt')
for aspect in ['stdout', 'stderr', 'exit', 'tags']:
expected_txt = subdir + '/' + aspect + '-expected.txt'
actual_txt = build_subdir + '/' + aspect + '-actual.txt'
if os.path.isfile(expected_txt):
if tmain_compare(subdir, build_subdir, aspect, strbuf):
os.remove(actual_txt)
else:
TMAIN_FAILED += [test_name + '/' + aspect + '-compare' +
failed_git_marker(expected_txt)]
TMAIN_STATUS = False
if aspect == 'stderr' and \
is_crashed(actual_txt) and \
shutil.which('gdb'):
print_backtraces(ctags_path,
glob.glob(build_subdir + '/core*'),
build_subdir + '/gdb-backtrace.txt')
elif os.path.isfile(actual_txt):
os.remove(actual_txt)
print(strbuf.getvalue(), end='')
sys.stdout.flush()
strbuf.close()
return True
def tmain_run(topdir, build_topdir, units):
global TMAIN_STATUS
TMAIN_STATUS = True
(q, threads) = create_thread_queue(tmain_sub)
basedir = os.getcwd()
for subdir in glob.glob(topdir + '/*.d'):
test_name = os.path.basename(subdir)[:-2]
if len(units) > 0 and not test_name in units:
continue
build_subdir = build_topdir + '/' + os.path.basename(subdir)
q.put((test_name, basedir, subdir, build_subdir))
join_workers(q, threads)
print()
if not TMAIN_STATUS:
print('Failed tests')
line('=')
for f in TMAIN_FAILED:
print(re.sub('', ' (not committed/cached yet)', f))
print()
if SHOW_DIFF_OUTPUT:
print('Detail [compare]')
line('-')
tmain_compare_result(build_topdir)
return TMAIN_STATUS
def action_tmain(parser, action, *args):
global CTAGS
global COLORIZED_OUTPUT
global WITH_VALGRIND
global SHOW_DIFF_OUTPUT
global READTAGS
global OPTSCRIPT
global UNITS
global NUM_WORKER_THREADS
global SHELL
parser.add_argument('--ctags',
help='ctags executable file for testing')
parser.add_argument('--colorized-output', choices=['yes', 'no'], default='yes',
help='print the result in color.')
parser.add_argument('--with-valgrind', action='store_true', default=False,
help='(not implemented) run a test case under valgrind')
parser.add_argument('--show-diff-output', action='store_true', default=False,
help='how diff output for failed test cases in the summary.')
parser.add_argument('--readtags',
help='readtags executable file for testing')
parser.add_argument('--optscript',
help='optscript executable file for testing')
parser.add_argument('--units', metavar='UNITS1[,UNITS2,...]',
help='run only Tmain/UNIT*.d (.d is not needed)')
parser.add_argument('--threads', type=int, default=NUM_WORKER_THREADS,
help='number of worker threads')
parser.add_argument('--shell',
help='shell to be used.')
parser.add_argument('tmain_dir',
help='Tmain directory.')
parser.add_argument('build_dir', nargs='?', default='',
help='Build directory. If not given, tmain_dir is used.')
res = parser.parse_args(args)
if res.ctags:
CTAGS = res.ctags
COLORIZED_OUTPUT = (res.colorized_output == 'yes')
WITH_VALGRIND = res.with_valgrind
SHOW_DIFF_OUTPUT = res.show_diff_output
if res.readtags:
READTAGS = res.readtags
if res.optscript:
OPTSCRIPT = res.optscript
if res.units:
UNITS = res.units.split(',')
NUM_WORKER_THREADS = res.threads
if res.shell:
SHELL = res.shell
if res.build_dir == '':
res.build_dir = res.tmain_dir
#check_availability('awk')
check_availability('diff')
if isabs(res.build_dir):
build_dir = res.build_dir
else:
build_dir = os.path.realpath(res.build_dir)
ret = tmain_run(res.tmain_dir, build_dir, UNITS)
if ret:
return 0
else:
return 1
def action_clean_tmain(parser, action, *args):
parser.add_argument('tmain_dir',
help='Build directory for tmain testing.')
res = parser.parse_args(args)
tmain_dir = res.tmain_dir
if not os.path.isdir(tmain_dir):
error_exit(0, 'No such directory: ' + tmain_dir)
for obj in ['stdout', 'stderr', 'exit', 'tags']:
for typ in ['actual', 'diff']:
for fn in glob.glob(tmain_dir + '/**/' + obj + '-' + typ + '.txt', recursive=True):
os.remove(fn)
for fn in glob.glob(tmain_dir + '/**/gdb-backtrace.txt', recursive=True):
os.remove(fn)
return 0
def prepare_environment():
global _PREPERE_ENV
os.environ['LC_ALL'] = 'C'
os.environ['MSYS2_ARG_CONV_EXCL'] = '--regex-;--_scopesep;--exclude;--exclude-exception'
_PREPERE_ENV = """LC_ALL="C"; export LC_ALL
MSYS2_ARG_CONV_EXCL='--regex-;--_scopesep;--exclude;--exclude-exception' export MSYS2_ARG_CONV_EXCL
"""
# enable ANSI escape sequences on Windows 10 1511 (10.0.10586) or later
def enable_esc_sequence():
if os.name != 'nt':
return
import ctypes
kernel32 = ctypes.windll.kernel32
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 4
STD_OUTPUT_HANDLE = -11
out = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
mode = ctypes.c_ulong()
if kernel32.GetConsoleMode(out, ctypes.byref(mode)):
kernel32.SetConsoleMode(out,
mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
def main():
prepare_environment()
enable_esc_sequence()
parser = argparse.ArgumentParser(
description='Units test harness for ctags.')
subparsers = parser.add_subparsers(dest='action', metavar='ACTION')
cmdmap = {}
cmdmap['run'] = [action_run,
subparsers.add_parser('run', aliases=['units'],
description='Run all tests case under units_dir.',
help='Run all tests case')]
cmdmap['units'] = cmdmap['run']
cmdmap['clean'] = [action_clean,
subparsers.add_parser('clean',
description='Clean all files created during units testing.',
help='Clean all files created during units testing')]
cmdmap['tmain'] = [action_tmain,
subparsers.add_parser('tmain',
description='Run tests for main part of ctags.',
help='Run tests for main part of ctags')]
cmdmap['clean-tmain'] = [action_clean_tmain,
subparsers.add_parser('clean-tmain',
description='Clean all files created during tmain testing.',
help='Clean all files created during tmain testing')]
subparsers.add_parser('help',
help='show this help message and exit')
cmdmap['help'] = [action_help, parser]
if len(sys.argv) < 2:
parser.print_help()
sys.exit(1)
res = parser.parse_args(sys.argv[1:2])
(func, subparser) = cmdmap[res.action]
sys.exit(func(subparser, *sys.argv[1:]))
if __name__ == '__main__':
main()