In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

testing.py 34.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# testing.py - test support for DHParser based grammars and compilers
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
17

18 19 20 21 22 23 24 25 26
"""
Module ``testing`` contains support for unit-testing domain specific
languages. Tests for arbitrarily small components of the Grammar can
be written into test files with ini-file syntax in order to test
whether the parser matches or fails as expected. It can also be
tested whether it produces an expected concrete or abstract syntax tree.
Usually, however, unexpected failure to match a certain string is the
main cause of trouble when constructing a context free Grammar.
"""
27 28


29
import collections
30
import concurrent.futures
31
import copy
Eckhart Arnold's avatar
Eckhart Arnold committed
32
import fnmatch
di68kap's avatar
di68kap committed
33
import inspect
34
import json
35
import multiprocessing
36
import os
37
import sys
38
from typing import Dict, List, Union, cast
39

di68kap's avatar
di68kap committed
40
from DHParser.error import Error, is_error, adjust_error_locations
41
from DHParser.log import log_dir, logging, is_logging, clear_logs, log_parsing_history
42
from DHParser.parse import UnknownParserError, Parser, Lookahead
43
from DHParser.syntaxtree import Node, RootNode, parse_tree, flatten_sxpr, ZOMBIE_TAG
44
from DHParser.toolkit import GLOBALS, get_config_value, load_if_file, re
45

46

di68kap's avatar
di68kap committed
47
__all__ = ('unit_from_config',
48
           'unit_from_json',
di68kap's avatar
di68kap committed
49
           'TEST_READERS',
50 51 52 53
           'unit_from_file',
           'get_report',
           'grammar_unit',
           'grammar_suite',
eckhart's avatar
eckhart committed
54 55 56
           'SymbolsDictType',
           'extract_symbols',
           'create_test_templates',
57
           'reset_unit',
58 59
           'runner',
           'clean_report')
60

61

62 63
UNIT_STAGES = {'match*', 'match', 'fail', 'ast', 'cst'}
RESULT_STAGES = {'__cst__', '__ast__', '__err__'}
64

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
# def unit_from_configfile(config_filename):
#     """
#     Reads a grammar unit test from a config file.
#     """
#     cfg = configparser.ConfigParser(interpolation=None)
#     cfg.read(config_filename, encoding="utf8")
#     OD = collections.OrderedDict
#     unit = OD()
#     for section in cfg.sections():
#         symbol, stage = section.split(':')
#         if stage not in UNIT_STAGES:
#             if symbol in UNIT_STAGES:
#                 symbol, stage = stage, symbol
#             else:
#                 raise ValueError('Test stage %s not in: ' % (stage, str(UNIT_STAGES)))
#         for testkey, testcode in cfg[section].items():
#             if testcode[:3] + testcode[-3:] in {"''''''", '""""""'}:
#                 testcode = testcode[3:-3]
#                 # testcode = testcode.replace('\\#', '#')
#                 testcode = re.sub(r'(?<!\\)\\#', '#', testcode).replace('\\\\', '\\')
#             elif testcode[:1] + testcode[-1:] in {"''", '""'}:
#                 testcode = testcode[1:-1]
#             unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
#     # print(json.dumps(unit, sort_keys=True, indent=4))
#     return unit

eckhart's avatar
eckhart committed
91
RX_SECTION = re.compile(r'\s*\[(?P<stage>\w+):(?P<symbol>\w+)\]')
92
RE_VALUE = '(?:"""((?:.|\n)*?)""")|' + "(?:'''((?:.|\n)*?)''')|" + \
eckhart's avatar
eckhart committed
93
           r'(?:"(.*?)")|' + "(?:'(.*?)')|" + r'(.*(?:\n(?:\s*\n)*    .*)*)'
94 95
# the following does not work with pypy3, because pypy's re-engine does not
# support local flags, e.g. '(?s: )'
eckhart's avatar
eckhart committed
96 97 98 99
# RE_VALUE = r'(?:"""((?s:.*?))""")|' + "(?:'''((?s:.*?))''')|" + \
#            r'(?:"(.*?)")|' + "(?:'(.*?)')|" + '(.*(?:\n(?:\s*\n)*    .*)*)'
RX_ENTRY = re.compile(r'\s*(\w+\*?)\s*:\s*(?:{value})\s*'.format(value=RE_VALUE))
RX_COMMENT = re.compile(r'\s*#.*\n')
100

101

di68kap's avatar
di68kap committed
102
def unit_from_config(config_str):
103 104 105 106
    """ Reads grammar unit tests contained in a file in config file (.ini)
    syntax.

    Args:
di68kap's avatar
di68kap committed
107
        config_str (str): A string containing a config-file with Grammar unit-tests
108 109 110 111

    Returns:
        A dictionary representing the unit tests.
    """
eckhart's avatar
eckhart committed
112 113
    # TODO: issue a warning if the same match:xxx or fail:xxx block appears more than once

114 115 116 117 118 119 120
    def eat_comments(txt, pos):
        m = RX_COMMENT.match(txt, pos)
        while m:
            pos = m.span()[1]
            m = RX_COMMENT.match(txt, pos)
        return pos

di68kap's avatar
di68kap committed
121
    cfg = config_str.replace('\t', '    ')
122

123 124
    OD = collections.OrderedDict
    unit = OD()
125 126 127 128 129 130

    pos = eat_comments(cfg, 0)
    section_match = RX_SECTION.match(cfg, pos)
    while section_match:
        d = section_match.groupdict()
        stage = d['stage']
131
        if stage not in UNIT_STAGES:
132 133 134 135 136
            raise KeyError('Unknown stage ' + stage + " ! must be one of: " + str(UNIT_STAGES))
        symbol = d['symbol']
        pos = eat_comments(cfg, section_match.span()[1])

        entry_match = RX_ENTRY.match(cfg, pos)
eckhart's avatar
eckhart committed
137 138
        # if entry_match is None:
        #     SyntaxError('No entries in section [%s:%s]' % (stage, symbol))
139 140 141 142 143 144 145 146 147 148
        while entry_match:
            testkey, testcode = [group for group in entry_match.groups() if group is not None]
            lines = testcode.split('\n')
            if len(lines) > 1:
                indent = sys.maxsize
                for line in lines[1:]:
                    indent = min(indent, len(line) - len(line.lstrip()))
                for i in range(1, len(lines)):
                    lines[i] = lines[i][indent:]
                testcode = '\n'.join(lines)
149 150
            # unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
            test = unit.setdefault(symbol, OD()).setdefault(stage, OD())
151 152
            assert testkey.strip('*') not in test and (testkey.strip('*') + '*') not in test, \
                "Key %s already exists in text %s:%s !" % (testkey, stage, symbol)
153
            test[testkey] = testcode
154 155 156 157 158
            pos = eat_comments(cfg, entry_match.span()[1])
            entry_match = RX_ENTRY.match(cfg, pos)

        section_match = RX_SECTION.match(cfg, pos)

159
    if pos != len(cfg) and not re.match(r'\s+$', cfg[pos:]):
160
        raise SyntaxError('in line %i' % (cfg[:pos].count('\n') + 2))  # TODO: Add file name
161

162
    return unit
163

164

di68kap's avatar
di68kap committed
165
def unit_from_json(json_str):
166
    """
di68kap's avatar
di68kap committed
167
    Reads grammar unit tests from a json string.
168
    """
di68kap's avatar
di68kap committed
169
    unit = json.loads(json_str)
170 171 172
    for symbol in unit:
        for stage in unit[symbol]:
            if stage not in UNIT_STAGES:
173
                raise ValueError('Test stage %s not in: %s' % (stage, str(UNIT_STAGES)))
174 175
    return unit

di68kap's avatar
di68kap committed
176

177
# TODO: add support for yaml, cson, toml
178 179


di68kap's avatar
di68kap committed
180 181 182 183 184 185 186 187 188
# A dictionary associating file endings with reader functions that
# transfrom strings containing the file's content to a nested dictionary
# structure of test cases.
TEST_READERS = {
    '.ini': unit_from_config,
    '.json': unit_from_json
}


189
def unit_from_file(filename):
190 191
    """
    Reads a grammar unit test from a file. The format of the file is
192 193
    determined by the ending of its name.
    """
di68kap's avatar
di68kap committed
194 195 196 197 198 199
    try:
        reader = TEST_READERS[os.path.splitext(filename)[1].lower()]
        with open(filename, 'r', encoding='utf8') as f:
            data = f.read()
        test_unit = reader(data)
    except KeyError:
200
        raise ValueError("Unknown unit test file type: " + filename[filename.rfind('.'):])
201

di68kap's avatar
di68kap committed
202 203 204
    # Check for ambiguous Test names
    errors = []
    for parser_name, tests in test_unit.items():
di68kap's avatar
di68kap committed
205 206 207 208 209 210 211 212
        # normalize case for test category names
        keys = list(tests.keys())
        for key in keys:
            new_key = key.lower()
            if new_key != key:
                tests[new_key] = tests[keys]
                del tests[keys]

di68kap's avatar
di68kap committed
213 214
        m_names = set(tests.get('match', dict()).keys())
        f_names = set(tests.get('fail', dict()).keys())
215 216
        intersection = list(m_names & f_names)
        intersection.sort()
di68kap's avatar
di68kap committed
217 218
        if intersection:
            errors.append("Same names %s assigned to match and fail test "
219 220
                          "of parser %s." % (str(intersection), parser_name) +
                          " Please, use different names!")
di68kap's avatar
di68kap committed
221 222 223 224 225 226
    if errors:
        raise EnvironmentError("Error(s) in Testfile %s :\n" % filename
                               + '\n'.join(errors))

    return test_unit

227

di68kap's avatar
di68kap committed
228 229 230 231 232 233
# def all_match_tests(tests):
#     """Returns all match tests from ``tests``, This includes match tests
#     marked with an asterix for CST-output as well as unmarked match-tests.
#     """
#     return itertools.chain(tests.get('match', dict()).items(),
#                            tests.get('match*', dict()).items())
234 235


236
def get_report(test_unit):
237
    """
238 239 240 241
    Returns a text-report of the results of a grammar unit test. The report
    lists the source of all tests as well as the error messages, if a test
    failed or the abstract-syntax-tree (AST) in case of success.

242 243
    If an asterix has been appended to the test name then the concrete syntax
    tree will also be added to the report in this particular case.
244 245 246 247

    The purpose of the latter is to help constructing and debugging
    of AST-Transformations. It is better to switch the CST-output on and off
    with the asterix marker when needed than to output the CST for all tests
248
    which would unnecessarily bloat the test reports.
249
    """
250 251 252 253
    def indent(txt):
        lines = txt.split('\n')
        lines[0] = '    ' + lines[0]
        return "\n    ".join(lines)
254

255 256 257 258
    report = []
    for parser_name, tests in test_unit.items():
        heading = 'Test of parser: "%s"' % parser_name
        report.append('\n\n%s\n%s\n' % (heading, '=' * len(heading)))
259
        for test_name, test_code in tests.get('match', dict()).items():
260 261 262
            heading = 'Match-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
263
            report.append(indent(test_code))
264 265 266 267 268
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
            ast = tests.get('__ast__', {}).get(test_name, None)
269
            cst = tests.get('__cst__', {}).get(test_name, None)
270
            if cst and (not ast or str(test_name).endswith('*')):
271
                report.append('\n### CST')
eckhart's avatar
eckhart committed
272
                report.append(indent(cst.serialize('cst')))
273
            if ast:
274
                report.append('\n### AST')
eckhart's avatar
eckhart committed
275
                report.append(indent(ast.serialize('ast')))
di68kap's avatar
di68kap committed
276 277 278 279 280
        for test_name, test_code in tests.get('fail', dict()).items():
            heading = 'Fail-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
            report.append(indent(test_code))
281 282 283 284
            messages = tests.get('__msg__', {}).get(test_name, "")
            if messages:
                report.append('\n### Messages:')
                report.append(messages)
di68kap's avatar
di68kap committed
285 286 287 288
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
289 290 291
    return '\n'.join(report)


292
def grammar_unit(test_unit, parser_factory, transformer_factory, report=True, verbose=False):
293 294
    """
    Unit tests for a grammar-parser and ast transformations.
295
    """
296 297 298 299 300 301 302 303 304
    output = []

    def write(s):
        nonlocal output
        """Append string `s` to output. The purpose is to defer printing to
        stdout in order to avoid muddled output when several unit tests run
        at the same time."""
        output.append(s)

di68kap's avatar
di68kap committed
305 306 307 308 309 310
    def clean_key(k):
        try:
            return k.replace('*', '')
        except AttributeError:
            return k

eckhart's avatar
eckhart committed
311
    def get(tests, category, key) -> str:
di68kap's avatar
di68kap committed
312 313 314 315
        try:
            value = tests[category][key] if key in tests[category] \
                else tests[category][clean_key(key)]
        except KeyError:
eckhart's avatar
eckhart committed
316 317 318
            return ''
            # raise AssertionError('%s-test %s for parser %s missing !?'
            #                      % (category, test_name, parser_name))
di68kap's avatar
di68kap committed
319 320
        return value

321
    if isinstance(test_unit, str):
322
        _, unit_name = os.path.split(os.path.splitext(test_unit)[0])
323
        test_unit = unit_from_file(test_unit)
324
    else:
325
        unit_name = 'unit_test_' + str(id(test_unit))
326
    if verbose:
327
        write("\nGRAMMAR TEST UNIT: " + unit_name)
328 329 330
    errata = []
    parser = parser_factory()
    transform = transformer_factory()
331

332
    def has_lookahead(parser_name: str) -> bool:
333
        """Returns True if the parser or any of its descendant parsers is a
334 335 336 337 338 339 340 341 342 343
        Lookahead parser."""
        lookahead_found = False

        def find_lookahead(p: Parser):
            nonlocal lookahead_found
            if not lookahead_found:
                lookahead_found = isinstance(p, Lookahead)

        parser[parser_name].apply(find_lookahead)
        return lookahead_found
344

345
    def lookahead_artifact(syntax_tree: Node):
di68kap's avatar
di68kap committed
346
        """
347
        Returns True, if the error merely occurred, because the parser
eckhart's avatar
eckhart committed
348
        stopped in front of a sequence that was captured by a lookahead
349 350 351
        operator or if a mandatory lookahead failed at the end of data.
        This is required for testing of parsers that put a lookahead
        operator at the end. See test_testing.TestLookahead.
di68kap's avatar
di68kap committed
352
        """
353
        raw_errors = syntax_tree.errors_sorted
354 355 356 357 358 359
        is_artifact = ((2 <= len(raw_errors) <= 3  # case 1:  superfluous data for lookahead
                        and {e.code for e in raw_errors} <=
                            {Error.PARSER_LOOKAHEAD_MATCH_ONLY,
                             Error.PARSER_STOPPED_BEFORE_END,
                             Error.PARSER_STOPPED_EXCEPT_FOR_LOOKAHEAD})
                       #  case 3:  mandatory lookahead failure at end of text
360 361 362
                       or (len(raw_errors) == 1
                           and raw_errors[-1].code == Error.MANDATORY_CONTINUATION_AT_EOF))
        if is_artifact:
363 364
            # don't remove zombie node with error message at the end
            # but change it's tag_name to indicate that it is an artifact!
365 366 367
            for parent in syntax_tree.select_if(lambda node: any(child.tag_name == ZOMBIE_TAG
                                                                 for child in node.children),
                                                include_root=True, reverse=True):
368 369
                zombie = parent[ZOMBIE_TAG]
                zombie.tag_name = '__TESTING_ARTIFACT__'
370 371 372
                zombie.result = 'Artifact can be ignored. Be aware, though, that also the' \
                                'tree structure may not be the same as in a non-testing ' \
                                'environment, when a testing artifact has occurred!'
373
                # parent.result = tuple(c for c in parent.children if c.tag_name != ZOMBIE_TAG)
374 375
                break
        return is_artifact
di68kap's avatar
di68kap committed
376

377
    for parser_name, tests in test_unit.items():
378 379 380
        if not get_config_value('test_parallelization'):
            print('  ' + parser_name)

381
        assert parser_name, "Missing parser name in test %s!" % unit_name
eckhart's avatar
eckhart committed
382
        assert not any(test_type in RESULT_STAGES for test_type in tests), \
383 384 385 386 387
            ("Test %s in %s already has results. Use reset_unit() before running again!"
             % (parser_name, unit_name))
        assert set(tests.keys()).issubset(UNIT_STAGES), \
            'Unknown test-types: %s ! Must be one of %s' \
            % (set(tests.keys()) - UNIT_STAGES, UNIT_STAGES)
388
        if verbose:
389
            write('  Match-Tests for parser "' + parser_name + '"')
390
        match_tests = set(tests['match'].keys()) if 'match' in tests else set()
391 392
        if 'ast' in tests:
            ast_tests = set(tests['ast'].keys())
di68kap's avatar
di68kap committed
393 394 395
            if not {clean_key(k) for k in ast_tests} <= {clean_key(k) for k in match_tests}:
                raise AssertionError('AST-Tests %s for parser %s lack corresponding match-tests!'
                                     % (str(ast_tests - match_tests), parser_name))
396 397
        if 'cst' in tests:
            cst_tests = set(tests['cst'].keys())
di68kap's avatar
di68kap committed
398
            if not {clean_key(k) for k in cst_tests} <= {clean_key(k) for k in match_tests}:
399 400
                raise AssertionError('CST-Tests %s lack corresponding match-tests!'
                                     % str(cst_tests - match_tests))
401 402 403

        # run match tests

404
        for test_name, test_code in tests.get('match', dict()).items():
405
            if not get_config_value('test_parallelization'):
406
                print('    ' + str(test_name))
407

eckhart's avatar
eckhart committed
408
            errflag = len(errata)
409
            try:
410
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
411
            except UnknownParserError as upe:
412
                cst = RootNode()
Eckhart Arnold's avatar
Eckhart Arnold committed
413
                cst = cst.new_error(Node(ZOMBIE_TAG, "").with_pos(0), str(upe))
eckhart's avatar
eckhart committed
414
            clean_test_name = str(test_name).replace('*', '')
eckhart's avatar
eckhart committed
415
            # log_ST(cst, "match_%s_%s.cst" % (parser_name, clean_test_name))
416
            tests.setdefault('__cst__', {})[test_name] = cst
417
            if is_error(cst.error_flag) and not lookahead_artifact(cst):
eckhart's avatar
eckhart committed
418 419
                errors = cst.errors_sorted
                adjust_error_locations(errors, test_code)
Eckhart Arnold's avatar
Eckhart Arnold committed
420
                errata.append('Match test "%s" for parser "%s" failed:\n\tExpr.:  %s\n\n\t%s\n\n' %
421
                              (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
422
                               '\n\t'.join(str(m).replace('\n', '\n\t\t') for m in errors)))
di68kap's avatar
di68kap committed
423
                # tests.setdefault('__err__', {})[test_name] = errata[-1]
424
                # write parsing-history log only in case of failure!
425
                if is_logging():
di68kap's avatar
di68kap committed
426
                    log_parsing_history(parser, "match_%s_%s.log" % (parser_name, clean_test_name))
427 428 429 430 431
            if "ast" in tests or report:
                ast = copy.deepcopy(cst)
                transform(ast)
                tests.setdefault('__ast__', {})[test_name] = ast
                # log_ST(ast, "match_%s_%s.ast" % (parser_name, clean_test_name))
432
            if verbose:
eckhart's avatar
eckhart committed
433
                infostr = '    match-test "' + test_name + '" ... '
434
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
435

eckhart's avatar
eckhart committed
436 437 438
            if "cst" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "cst", test_name))
                if compare:
439
                    if not compare.equals(cst):
eckhart's avatar
eckhart committed
440
                        errata.append('Concrete syntax tree test "%s" for parser "%s" failed:\n%s' %
eckhart's avatar
eckhart committed
441
                                      (test_name, parser_name, cst.serialize('cst')))
eckhart's avatar
eckhart committed
442 443 444 445 446 447 448
                    if verbose:
                        infostr = '      cst-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if "ast" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "ast", test_name))
                if compare:
449
                    if not compare.equals(ast):
eckhart's avatar
eckhart committed
450 451 452 453 454 455 456 457 458 459 460 461
                        errata.append('Abstract syntax tree test "%s" for parser "%s" failed:'
                                      '\n\tExpr.:     %s\n\tExpected:  %s\n\tReceived:  %s'
                                      % (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
                                         flatten_sxpr(compare.as_sxpr()),
                                         flatten_sxpr(ast.as_sxpr())))
                    if verbose:
                        infostr = '      ast-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if len(errata) > errflag:
                tests.setdefault('__err__', {})[test_name] = errata[-1]

462
        if verbose and 'fail' in tests:
463
            write('  Fail-Tests for parser "' + parser_name + '"')
464 465 466

        # run fail tests

467
        for test_name, test_code in tests.get('fail', dict()).items():
eckhart's avatar
eckhart committed
468
            errflag = len(errata)
469 470
            # cst = parser(test_code, parser_name)
            try:
471
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
472
            except UnknownParserError as upe:
Eckhart Arnold's avatar
Eckhart Arnold committed
473
                node = Node(ZOMBIE_TAG, "").with_pos(0)
eckhart's avatar
eckhart committed
474
                cst = RootNode(node).new_error(node, str(upe))
475
                errata.append('Unknown parser "{}" in fail test "{}"!'.format(parser_name, test_name))
476
                tests.setdefault('__err__', {})[test_name] = errata[-1]
477
            if not (is_error(cst.error_flag) and not lookahead_artifact(cst)):
478 479
                errata.append('Fail test "%s" for parser "%s" yields match instead of '
                              'expected failure!' % (test_name, parser_name))
480
                tests.setdefault('__err__', {})[test_name] = errata[-1]
481
                # write parsing-history log only in case of test-failure
482
                if is_logging():
483
                    log_parsing_history(parser, "fail_%s_%s.log" % (parser_name, test_name))
484 485
            if cst.error_flag:
                tests.setdefault('__msg__', {})[test_name] = \
486
                    "\n".join(str(e) for e in cst.errors_sorted)
487
            if verbose:
eckhart's avatar
eckhart committed
488
                infostr = '    fail-test  "' + test_name + '" ... '
489
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
490

491 492
    # write test-report
    if report:
493
        report_dir = "REPORT"
eckhart's avatar
eckhart committed
494 495
        test_report = get_report(test_unit)
        if test_report:
496
            try:
eckhart's avatar
eckhart committed
497
                os.mkdir(report_dir)
498 499
            except FileExistsError:
                pass
eckhart's avatar
eckhart committed
500 501
            with open(os.path.join(report_dir, unit_name + '.md'), 'w', encoding='utf8') as f:
                f.write(test_report)
502

503
    print('\n'.join(output))
504 505 506
    return errata


507
def reset_unit(test_unit):
eckhart's avatar
eckhart committed
508 509 510 511
    """
    Resets the tests in ``test_unit`` by removing all results and error
    messages.
    """
512 513 514 515 516 517 518 519
    for parser, tests in test_unit.items():
        for key in list(tests.keys()):
            if key not in UNIT_STAGES:
                if key not in RESULT_STAGES:
                    print('Removing unknown component %s from test %s' % (key, parser))
                del tests[key]


520 521 522 523 524 525 526 527 528 529 530
def run_unit(logdir, *parameters):
    """
    Run `grammar_unit()` with logs written to `log_dir` or no logs if `log_dir`
    evaluates to False. This helper functions is needed for running unit tests
    in a multiprocessing environment, because log.log_dir(), log.logging() and
    log.is_logging() are thread-local.
    """
    with logging(logdir):
        return grammar_unit(*parameters)


Eckhart Arnold's avatar
Eckhart Arnold committed
531 532 533 534
def grammar_suite(directory, parser_factory, transformer_factory,
                  fn_patterns=['*test*'],
                  ignore_unknown_filetypes=False,
                  report=True, verbose=True):
535 536
    """
    Runs all grammar unit tests in a directory. A file is considered a test
537 538
    unit, if it has the word "test" in its name.
    """
539
    if not isinstance(fn_patterns, collections.abc.Iterable):
Eckhart Arnold's avatar
Eckhart Arnold committed
540
        fn_patterns = [fn_patterns]
541
    all_errors = collections.OrderedDict()
542 543
    if verbose:
        print("\nScanning test-directory: " + directory)
544 545
    save_cwd = os.getcwd()
    os.chdir(directory)
eckhart's avatar
eckhart committed
546 547
    if is_logging():
        clear_logs()
548 549 550 551 552

    if get_config_value('test_parallelization'):
        with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
            results = []
            for filename in sorted(os.listdir('.')):
di68kap's avatar
di68kap committed
553
                print(filename)
554 555
                if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                    parameters = filename, parser_factory, transformer_factory, report, verbose
556
                    results.append((filename, pool.submit(run_unit, log_dir(), *parameters)))
557 558 559 560 561 562 563 564 565 566 567
            for filename, err_future in results:
                try:
                    errata = err_future.result()
                    if errata:
                        all_errors[filename] = errata
                except ValueError as e:
                    if not ignore_unknown_filetypes or str(e).find("Unknown") < 0:
                        raise e
    else:
        results = []
        for filename in sorted(os.listdir('.')):
568 569
            if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                parameters = filename, parser_factory, transformer_factory, report, verbose
570
                # print(filename)
571 572 573 574 575
                results.append((filename, grammar_unit(*parameters)))
        for filename, errata in results:
            if errata:
                all_errors[filename] = errata

576
    os.chdir(save_cwd)
eckhart's avatar
eckhart committed
577 578
    error_report = []
    err_N = 0
579 580
    if all_errors:
        for filename in all_errors:
di68kap's avatar
di68kap committed
581
            error_report.append('Errors found by unit test "%s":\n' % filename)
di68kap's avatar
di68kap committed
582
            err_N += len(all_errors[filename])
583 584 585
            for error in all_errors[filename]:
                error_report.append('\t' + '\n\t'.join(error.split('\n')))
    if error_report:
di68kap's avatar
di68kap committed
586 587 588 589
        # if verbose:
        #     print("\nFAILURE! %i error%s found!\n" % (err_N, 's' if err_N > 1 else ''))
        return ('Test suite "%s" revealed %s error%s:\n\n'
                % (directory, err_N, 's' if err_N > 1 else '') + '\n'.join(error_report))
eckhart's avatar
eckhart committed
590 591
    if verbose:
        print("\nSUCCESS! All tests passed :-)\n")
592 593 594
    return ''


eckhart's avatar
eckhart committed
595 596 597 598 599 600 601
########################################################################
#
# Support for unit-testing of ebnf-grammars
#
########################################################################


602
RX_DEFINITION_OR_SECTION = re.compile(r'(?:^|\n)[ \t]*(\w+(?=[ \t]*=)|#:.*(?=\n|$|#))')
eckhart's avatar
eckhart committed
603 604 605 606
SymbolsDictType = Dict[str, List[str]]


def extract_symbols(ebnf_text_or_file: str) -> SymbolsDictType:
607
    r"""
eckhart's avatar
eckhart committed
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
    Extracts all defined symbols from an EBNF-grammar. This can be used to
    prepare grammar-tests. The symbols will be returned as lists of strings
    which are grouped by the sections to which they belong and returned as
    an ordered dictionary, they keys of which are the section names.
    In order to define a section in the ebnf-source, add a comment-line
    starting with "#:", followed by the section name. It is recommended
    to use valid file names as section names. Example:

        #: components

        expression = term  { EXPR_OP~ term}
        term       = factor  { TERM_OP~ factor}
        factor     = [SIGN] ( NUMBER | VARIABLE | group ) { VARIABLE | group }
        group      = "(" expression ")"


        #: leaf_expressions

        EXPR_OP    = /\+/ | /-/
        TERM_OP    = /\*/ | /\//
        SIGN       = /-/

        NUMBER     = /(?:0|(?:[1-9]\d*))(?:\.\d+)?/~
        VARIABLE   = /[A-Za-z]/~

    If no sections have been defined in the comments, there will be only
    one group with the empty string as a key.

    :param ebnf_text_or_file: Either an ebnf-grammar or the file-name
            of an ebnf-grammar
    :return: Ordered dictionary mapping the section names of the grammar
            to lists of symbols that appear under that section.
    """
    def trim_section_name(name: str) -> str:
642
        return re.sub(r'[^\w-]', '_', name.replace('#:', '').strip())
eckhart's avatar
eckhart committed
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691

    ebnf = load_if_file(ebnf_text_or_file)
    deflist = RX_DEFINITION_OR_SECTION.findall(ebnf)
    if not deflist:
        raise AssertionError('No symbols found in: ' + ebnf_text_or_file[:40])
    symbols = collections.OrderedDict()  # type: SymbolsDictType
    if deflist[0][:2] != '#:':
        curr_section = ''
        symbols[curr_section] = []
    for df in deflist:
        if df[:2] == '#:':
            curr_section = trim_section_name(df)
            if curr_section in symbols:
                raise AssertionError('Section name must not be repeated: ' + curr_section)
            symbols[curr_section] = []
        else:
            symbols[curr_section].append(df)
    return symbols


def create_test_templates(symbols_or_ebnf: Union[str, SymbolsDictType],
                          path: str,
                          fmt: str = '.ini') -> None:
    """
    Creates template files for grammar unit-tests for the given symbols .

    Args:
        symbols_or_ebnf: Either a dictionary that matches section names to
                the grammar's symbols under that section or an EBNF-grammar
                or file name of an EBNF-grammar from which the symbols shall
                be extracted.
        path: the path to the grammar-test directory (usually 'grammar_tests').
                If the last element of the path does not exist, the directory
                will be created.
        fmt: the test-file-format. At the moment only '.ini' is supported
    """
    assert fmt == '.ini'
    if isinstance(symbols_or_ebnf, str):
        symbols = extract_symbols(cast(str, symbols_or_ebnf))  # type: SymbolsDictType
    else:
        symbols = cast(Dict, symbols_or_ebnf)
    if not os.path.exists(path):
        os.mkdir(path)
    if os.path.isdir(path):
        save = os.getcwd()
        os.chdir(path)
        keys = reversed(list(symbols.keys()))
        for i, k in enumerate(keys):
            filename = '{num:0>2}_test_{section}'.format(num=i+1, section=k) + fmt
692 693
            if not os.path.exists(filename):
                print('Creating test file template "{name}".'.format(name=filename))
eckhart's avatar
eckhart committed
694 695 696 697 698 699 700 701 702 703
                with open(filename, 'w', encoding='utf-8') as f:
                    for sym in symbols[k]:
                        f.write('\n[match:{sym}]\n\n'.format(sym=sym))
                        f.write('[ast:{sym}]\n\n'.format(sym=sym))
                        f.write('[fail:{sym}]\n\n'.format(sym=sym))
        os.chdir(save)
    else:
        raise ValueError(path + ' is not a directory!')


704 705 706 707 708 709
#######################################################################
#
#  general unit testing support
#
#######################################################################

710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739

def run_tests_in_class(test, namespace):
    """
    Runs all tests in test-class `test` in the given namespace.
    """
    def instantiate(cls_name, namespace):
        exec("obj = " + cls_name + "()", namespace)
        obj = namespace["obj"]
        if "setup" in dir(obj):
            obj.setup()
        return obj

    obj = None
    try:
        if test.find('.') >= 0:
            cls_name, method_name = test.split('.')
            obj = instantiate(cls_name, namespace)
            print("Running " + cls_name + "." + method_name)
            exec('obj.' + method_name + '()')
        else:
            obj = instantiate(test, namespace)
            for name in dir(obj):
                if name.lower().startswith("test"):
                    print("Running " + test + "." + name)
                    exec('obj.' + name + '()')
    finally:
        if "teardown" in dir(obj):
            obj.teardown()


740
def run_test_function(func_name, namespace):
741 742 743
    """
    Run the test-function `test` in the given namespace.
    """
744 745
    print("Running test-function: " + func_name)
    exec(func_name + '()', namespace)
746 747


eckhart's avatar
eckhart committed
748
def runner(tests, namespace):
749 750
    """
    Runs all or some selected Python unit tests found in the
eckhart's avatar
eckhart committed
751
    namespace. To run all tests in a module, call
752
    ``runner("", globals())`` from within that module.
753

754 755 756 757
    Unit-Tests are either classes, the name of which starts with
    "Test" and methods, the name of which starts with "test" contained
    in such classes or functions, the name of which starts with "test".

758
    Args:
eckhart's avatar
eckhart committed
759 760 761 762 763 764
        tests: String or list of strings with the names of tests to
            run. If empty, runner searches by itself all objects the
            of which starts with 'test' and runs it (if its a function)
            or all of its methods that start with "test" if its a class
            plus the "setup" and "teardown" methods if they exist.

eckhart's avatar
eckhart committed
765
        namespace: The namespace for running the test, usually
766
            ``globals()`` should be used.
eckhart's avatar
eckhart committed
767

768 769 770 771 772 773 774 775
    Example:
        class TestSomething()
            def setup(self):
                pass
            def teardown(self):
                pass
            def test_something(self):
                pass
eckhart's avatar
eckhart committed
776

777
        if __name__ == "__main__":
di68kap's avatar
di68kap committed
778
            from DHParser.testing import runner
eckhart's avatar
eckhart committed
779
            runner("", globals())
780
    """
eckhart's avatar
eckhart committed
781 782
    test_classes = []
    test_functions = []
783

eckhart's avatar
eckhart committed
784 785 786 787
    if tests:
        if isinstance(tests, str):
            tests = tests.split(' ')
        assert all(test.lower().startswith('test') for test in tests)
788
    else:
eckhart's avatar
eckhart committed
789 790 791 792 793 794 795 796
        tests = namespace.keys()

    for name in tests:
        if name.lower().startswith('test'):
            if inspect.isclass(namespace[name]):
                test_classes.append(name)
            elif inspect.isfunction(namespace[name]):
                test_functions.append(name)
797 798

    for test in test_classes:
799
        run_tests_in_class(test, namespace)
800 801

    for test in test_functions:
802 803 804
        run_test_function(test, namespace)


805 806
def run_file(fname):
    if fname.lower().startswith('test_') and fname.endswith('.py'):
807
        print("RUNNING " + fname)
808 809 810 811
        # print('\nRUNNING UNIT TESTS IN: ' + fname)
        exec('import ' + fname[:-3])
        runner('', eval(fname[:-3]).__dict__)

812

813 814 815 816 817 818 819 820 821
def run_with_log(logdir, f):
    """
    Run `grammar_unit()` with logs written to `log_dir` or no logs if `log_dir`
    evaluates to False. This helper functions is needed for running unit tests
    in a multiprocessing environment, because log.log_dir(), log.logging() and
    log.is_logging() are thread-local.
    """
    with logging(logdir):
        run_file(f)
822

823

824 825 826 827 828 829
def run_path(path):
    """Runs all unit tests in `path`"""
    if os.path.isdir(path):
        sys.path.append(path)
        files = os.listdir(path)
        result_futures = []
830 831 832 833

        if get_config_value('test_parallelization'):
            with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
                for f in files:
834
                    result_futures.append(pool.submit(run_with_log, log_dir(), f))
835 836 837 838 839 840 841
                    # run_file(f)  # for testing!
                for r in result_futures:
                    try:
                        _ = r.result()
                    except AssertionError as failure:
                        print(failure)
        else:
842
            for f in files:
843 844
                run_file(f)

845 846 847 848 849 850
    else:
        path, fname = os.path.split(path)
        sys.path.append(path)
        run_file(fname)
    sys.path.pop()

851 852 853 854 855 856 857 858 859 860 861 862 863 864

def clean_report():
    """Deletes any test-report-files in the REPORT sub-directory and removes
    the REPORT sub-directory, if it is empty after deleting the files."""
    if os.path.exists('REPORT'):
        files = os.listdir('REPORT')
        flag = False
        for file in files:
            if re.match(r'\w*_test_\d+\.md', file):
                os.remove(os.path.join('REPORT', file))
            else:
                flag = True
        if not flag:
            os.rmdir('REPORT')