24.09., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

testing.py 34.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# testing.py - test support for DHParser based grammars and compilers
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
17

18 19 20 21 22 23 24 25 26
"""
Module ``testing`` contains support for unit-testing domain specific
languages. Tests for arbitrarily small components of the Grammar can
be written into test files with ini-file syntax in order to test
whether the parser matches or fails as expected. It can also be
tested whether it produces an expected concrete or abstract syntax tree.
Usually, however, unexpected failure to match a certain string is the
main cause of trouble when constructing a context free Grammar.
"""
27 28


29
import collections
30
import concurrent.futures
31
import copy
Eckhart Arnold's avatar
Eckhart Arnold committed
32
import fnmatch
di68kap's avatar
di68kap committed
33
import inspect
34
import json
35
import multiprocessing
36
import os
37
import sys
38
from typing import Dict, List, Union, cast
39

di68kap's avatar
di68kap committed
40
from DHParser.error import Error, is_error, adjust_error_locations
41
from DHParser.log import log_dir, logging, is_logging, clear_logs, log_parsing_history
42
from DHParser.parse import UnknownParserError, Parser, Lookahead
43
from DHParser.syntaxtree import Node, RootNode, parse_tree, flatten_sxpr, ZOMBIE_TAG
44
from DHParser.toolkit import GLOBALS, get_config_value, load_if_file, re
45

46

di68kap's avatar
di68kap committed
47
__all__ = ('unit_from_config',
48
           'unit_from_json',
di68kap's avatar
di68kap committed
49
           'TEST_READERS',
50 51 52 53
           'unit_from_file',
           'get_report',
           'grammar_unit',
           'grammar_suite',
eckhart's avatar
eckhart committed
54 55 56
           'SymbolsDictType',
           'extract_symbols',
           'create_test_templates',
57
           'reset_unit',
58 59
           'runner',
           'clean_report')
60

61

62 63
UNIT_STAGES = {'match*', 'match', 'fail', 'ast', 'cst'}
RESULT_STAGES = {'__cst__', '__ast__', '__err__'}
64

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
# def unit_from_configfile(config_filename):
#     """
#     Reads a grammar unit test from a config file.
#     """
#     cfg = configparser.ConfigParser(interpolation=None)
#     cfg.read(config_filename, encoding="utf8")
#     OD = collections.OrderedDict
#     unit = OD()
#     for section in cfg.sections():
#         symbol, stage = section.split(':')
#         if stage not in UNIT_STAGES:
#             if symbol in UNIT_STAGES:
#                 symbol, stage = stage, symbol
#             else:
#                 raise ValueError('Test stage %s not in: ' % (stage, str(UNIT_STAGES)))
#         for testkey, testcode in cfg[section].items():
#             if testcode[:3] + testcode[-3:] in {"''''''", '""""""'}:
#                 testcode = testcode[3:-3]
#                 # testcode = testcode.replace('\\#', '#')
#                 testcode = re.sub(r'(?<!\\)\\#', '#', testcode).replace('\\\\', '\\')
#             elif testcode[:1] + testcode[-1:] in {"''", '""'}:
#                 testcode = testcode[1:-1]
#             unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
#     # print(json.dumps(unit, sort_keys=True, indent=4))
#     return unit

eckhart's avatar
eckhart committed
91
RX_SECTION = re.compile(r'\s*\[(?P<stage>\w+):(?P<symbol>\w+)\]')
92
RE_VALUE = '(?:"""((?:.|\n)*?)""")|' + "(?:'''((?:.|\n)*?)''')|" + \
eckhart's avatar
eckhart committed
93
           r'(?:"(.*?)")|' + "(?:'(.*?)')|" + r'(.*(?:\n(?:\s*\n)*    .*)*)'
94 95
# the following does not work with pypy3, because pypy's re-engine does not
# support local flags, e.g. '(?s: )'
eckhart's avatar
eckhart committed
96 97 98 99
# RE_VALUE = r'(?:"""((?s:.*?))""")|' + "(?:'''((?s:.*?))''')|" + \
#            r'(?:"(.*?)")|' + "(?:'(.*?)')|" + '(.*(?:\n(?:\s*\n)*    .*)*)'
RX_ENTRY = re.compile(r'\s*(\w+\*?)\s*:\s*(?:{value})\s*'.format(value=RE_VALUE))
RX_COMMENT = re.compile(r'\s*#.*\n')
100

101

di68kap's avatar
di68kap committed
102
def unit_from_config(config_str):
103 104 105 106
    """ Reads grammar unit tests contained in a file in config file (.ini)
    syntax.

    Args:
di68kap's avatar
di68kap committed
107
        config_str (str): A string containing a config-file with Grammar unit-tests
108 109 110 111

    Returns:
        A dictionary representing the unit tests.
    """
eckhart's avatar
eckhart committed
112 113
    # TODO: issue a warning if the same match:xxx or fail:xxx block appears more than once

114 115 116 117 118 119 120
    def eat_comments(txt, pos):
        m = RX_COMMENT.match(txt, pos)
        while m:
            pos = m.span()[1]
            m = RX_COMMENT.match(txt, pos)
        return pos

di68kap's avatar
di68kap committed
121
    cfg = config_str.replace('\t', '    ')
122

123 124
    OD = collections.OrderedDict
    unit = OD()
125 126 127 128 129 130

    pos = eat_comments(cfg, 0)
    section_match = RX_SECTION.match(cfg, pos)
    while section_match:
        d = section_match.groupdict()
        stage = d['stage']
131
        if stage not in UNIT_STAGES:
132 133 134 135 136
            raise KeyError('Unknown stage ' + stage + " ! must be one of: " + str(UNIT_STAGES))
        symbol = d['symbol']
        pos = eat_comments(cfg, section_match.span()[1])

        entry_match = RX_ENTRY.match(cfg, pos)
eckhart's avatar
eckhart committed
137 138
        # if entry_match is None:
        #     SyntaxError('No entries in section [%s:%s]' % (stage, symbol))
139 140 141 142 143 144 145 146 147 148
        while entry_match:
            testkey, testcode = [group for group in entry_match.groups() if group is not None]
            lines = testcode.split('\n')
            if len(lines) > 1:
                indent = sys.maxsize
                for line in lines[1:]:
                    indent = min(indent, len(line) - len(line.lstrip()))
                for i in range(1, len(lines)):
                    lines[i] = lines[i][indent:]
                testcode = '\n'.join(lines)
149 150
            # unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
            test = unit.setdefault(symbol, OD()).setdefault(stage, OD())
151 152
            assert testkey.strip('*') not in test and (testkey.strip('*') + '*') not in test, \
                "Key %s already exists in text %s:%s !" % (testkey, stage, symbol)
153
            test[testkey] = testcode
154 155 156 157 158
            pos = eat_comments(cfg, entry_match.span()[1])
            entry_match = RX_ENTRY.match(cfg, pos)

        section_match = RX_SECTION.match(cfg, pos)

159
    if pos != len(cfg) and not re.match(r'\s+$', cfg[pos:]):
160
        raise SyntaxError('in line %i' % (cfg[:pos].count('\n') + 2))  # TODO: Add file name
161

162
    return unit
163

164

di68kap's avatar
di68kap committed
165
def unit_from_json(json_str):
166
    """
di68kap's avatar
di68kap committed
167
    Reads grammar unit tests from a json string.
168
    """
di68kap's avatar
di68kap committed
169
    unit = json.loads(json_str)
170 171 172
    for symbol in unit:
        for stage in unit[symbol]:
            if stage not in UNIT_STAGES:
173
                raise ValueError('Test stage %s not in: %s' % (stage, str(UNIT_STAGES)))
174 175
    return unit

di68kap's avatar
di68kap committed
176

177
# TODO: add support for yaml, cson, toml
178 179


di68kap's avatar
di68kap committed
180 181 182 183 184 185 186 187 188
# A dictionary associating file endings with reader functions that
# transfrom strings containing the file's content to a nested dictionary
# structure of test cases.
TEST_READERS = {
    '.ini': unit_from_config,
    '.json': unit_from_json
}


189
def unit_from_file(filename):
190 191
    """
    Reads a grammar unit test from a file. The format of the file is
192 193
    determined by the ending of its name.
    """
di68kap's avatar
di68kap committed
194 195 196 197 198 199
    try:
        reader = TEST_READERS[os.path.splitext(filename)[1].lower()]
        with open(filename, 'r', encoding='utf8') as f:
            data = f.read()
        test_unit = reader(data)
    except KeyError:
200
        raise ValueError("Unknown unit test file type: " + filename[filename.rfind('.'):])
201

di68kap's avatar
di68kap committed
202 203 204
    # Check for ambiguous Test names
    errors = []
    for parser_name, tests in test_unit.items():
di68kap's avatar
di68kap committed
205 206 207 208 209 210 211 212
        # normalize case for test category names
        keys = list(tests.keys())
        for key in keys:
            new_key = key.lower()
            if new_key != key:
                tests[new_key] = tests[keys]
                del tests[keys]

di68kap's avatar
di68kap committed
213 214
        m_names = set(tests.get('match', dict()).keys())
        f_names = set(tests.get('fail', dict()).keys())
215 216
        intersection = list(m_names & f_names)
        intersection.sort()
di68kap's avatar
di68kap committed
217 218
        if intersection:
            errors.append("Same names %s assigned to match and fail test "
219 220
                          "of parser %s." % (str(intersection), parser_name) +
                          " Please, use different names!")
di68kap's avatar
di68kap committed
221 222 223 224 225 226
    if errors:
        raise EnvironmentError("Error(s) in Testfile %s :\n" % filename
                               + '\n'.join(errors))

    return test_unit

227

di68kap's avatar
di68kap committed
228 229 230 231 232 233
# def all_match_tests(tests):
#     """Returns all match tests from ``tests``, This includes match tests
#     marked with an asterix for CST-output as well as unmarked match-tests.
#     """
#     return itertools.chain(tests.get('match', dict()).items(),
#                            tests.get('match*', dict()).items())
234 235


236
def get_report(test_unit):
237
    """
238 239 240 241
    Returns a text-report of the results of a grammar unit test. The report
    lists the source of all tests as well as the error messages, if a test
    failed or the abstract-syntax-tree (AST) in case of success.

242 243
    If an asterix has been appended to the test name then the concrete syntax
    tree will also be added to the report in this particular case.
244 245 246 247

    The purpose of the latter is to help constructing and debugging
    of AST-Transformations. It is better to switch the CST-output on and off
    with the asterix marker when needed than to output the CST for all tests
248
    which would unnecessarily bloat the test reports.
249
    """
250 251 252 253
    def indent(txt):
        lines = txt.split('\n')
        lines[0] = '    ' + lines[0]
        return "\n    ".join(lines)
254

255 256 257 258
    report = []
    for parser_name, tests in test_unit.items():
        heading = 'Test of parser: "%s"' % parser_name
        report.append('\n\n%s\n%s\n' % (heading, '=' * len(heading)))
259
        for test_name, test_code in tests.get('match', dict()).items():
260 261 262
            heading = 'Match-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
263
            report.append(indent(test_code))
264 265 266 267 268
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
            ast = tests.get('__ast__', {}).get(test_name, None)
269
            cst = tests.get('__cst__', {}).get(test_name, None)
270
            if cst and (not ast or str(test_name).endswith('*')):
271
                report.append('\n### CST')
eckhart's avatar
eckhart committed
272
                report.append(indent(cst.serialize('cst')))
273
            if ast:
274
                report.append('\n### AST')
eckhart's avatar
eckhart committed
275
                report.append(indent(ast.serialize('ast')))
di68kap's avatar
di68kap committed
276 277 278 279 280
        for test_name, test_code in tests.get('fail', dict()).items():
            heading = 'Fail-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
            report.append(indent(test_code))
281 282 283 284
            messages = tests.get('__msg__', {}).get(test_name, "")
            if messages:
                report.append('\n### Messages:')
                report.append(messages)
di68kap's avatar
di68kap committed
285 286 287 288
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
289 290 291
    return '\n'.join(report)


292
def grammar_unit(test_unit, parser_factory, transformer_factory, report='REPORT', verbose=False):
293 294
    """
    Unit tests for a grammar-parser and ast transformations.
295
    """
296 297 298 299 300 301 302 303 304
    output = []

    def write(s):
        nonlocal output
        """Append string `s` to output. The purpose is to defer printing to
        stdout in order to avoid muddled output when several unit tests run
        at the same time."""
        output.append(s)

di68kap's avatar
di68kap committed
305 306 307 308 309 310
    def clean_key(k):
        try:
            return k.replace('*', '')
        except AttributeError:
            return k

eckhart's avatar
eckhart committed
311
    def get(tests, category, key) -> str:
di68kap's avatar
di68kap committed
312 313 314 315
        try:
            value = tests[category][key] if key in tests[category] \
                else tests[category][clean_key(key)]
        except KeyError:
eckhart's avatar
eckhart committed
316 317 318
            return ''
            # raise AssertionError('%s-test %s for parser %s missing !?'
            #                      % (category, test_name, parser_name))
di68kap's avatar
di68kap committed
319 320
        return value

321
    if isinstance(test_unit, str):
322
        _, unit_name = os.path.split(os.path.splitext(test_unit)[0])
323
        test_unit = unit_from_file(test_unit)
324
    else:
325
        unit_name = 'unit_test_' + str(id(test_unit))
326
    if verbose:
327
        write("\nGRAMMAR TEST UNIT: " + unit_name)
328 329 330
    errata = []
    parser = parser_factory()
    transform = transformer_factory()
331

332
    def has_lookahead(parser_name: str) -> bool:
333
        """Returns True if the parser or any of its descendant parsers is a
334 335 336 337 338 339 340 341 342 343
        Lookahead parser."""
        lookahead_found = False

        def find_lookahead(p: Parser):
            nonlocal lookahead_found
            if not lookahead_found:
                lookahead_found = isinstance(p, Lookahead)

        parser[parser_name].apply(find_lookahead)
        return lookahead_found
344

345
    def lookahead_artifact(syntax_tree: Node):
di68kap's avatar
di68kap committed
346
        """
347
        Returns True, if the error merely occurred, because the parser
eckhart's avatar
eckhart committed
348
        stopped in front of a sequence that was captured by a lookahead
349 350 351
        operator or if a mandatory lookahead failed at the end of data.
        This is required for testing of parsers that put a lookahead
        operator at the end. See test_testing.TestLookahead.
di68kap's avatar
di68kap committed
352
        """
353
        raw_errors = syntax_tree.errors_sorted
354
        is_artifact = ((2 <= len(raw_errors) == 3  # case 1:  superfluous data for lookahead
355 356
                        and {e.code for e in raw_errors} <=
                            {Error.PARSER_LOOKAHEAD_MATCH_ONLY,
357
                             # Error.PARSER_STOPPED_BEFORE_END,
358
                             Error.PARSER_STOPPED_EXCEPT_FOR_LOOKAHEAD})
359
                       or (len(raw_errors) == 1
360 361 362
                           and (raw_errors[-1].code == Error.PARSER_STOPPED_EXCEPT_FOR_LOOKAHEAD
                                #  case 2:  mandatory lookahead failure at end of text
                                or raw_errors[-1].code == Error.MANDATORY_CONTINUATION_AT_EOF)))
363
        if is_artifact:
364 365
            # don't remove zombie node with error message at the end
            # but change it's tag_name to indicate that it is an artifact!
366 367 368
            for parent in syntax_tree.select_if(lambda node: any(child.tag_name == ZOMBIE_TAG
                                                                 for child in node.children),
                                                include_root=True, reverse=True):
369 370
                zombie = parent[ZOMBIE_TAG]
                zombie.tag_name = '__TESTING_ARTIFACT__'
di68kap's avatar
di68kap committed
371
                zombie.result = 'Artifact can be ignored. Be aware, though, that also the ' \
372 373
                                'tree structure may not be the same as in a non-testing ' \
                                'environment, when a testing artifact has occurred!'
374
                # parent.result = tuple(c for c in parent.children if c.tag_name != ZOMBIE_TAG)
375 376
                break
        return is_artifact
di68kap's avatar
di68kap committed
377

378
    for parser_name, tests in test_unit.items():
379 380 381
        if not get_config_value('test_parallelization'):
            print('  ' + parser_name)

382
        assert parser_name, "Missing parser name in test %s!" % unit_name
eckhart's avatar
eckhart committed
383
        assert not any(test_type in RESULT_STAGES for test_type in tests), \
384 385 386 387 388
            ("Test %s in %s already has results. Use reset_unit() before running again!"
             % (parser_name, unit_name))
        assert set(tests.keys()).issubset(UNIT_STAGES), \
            'Unknown test-types: %s ! Must be one of %s' \
            % (set(tests.keys()) - UNIT_STAGES, UNIT_STAGES)
389
        if verbose:
390
            write('  Match-Tests for parser "' + parser_name + '"')
391
        match_tests = set(tests['match'].keys()) if 'match' in tests else set()
392 393
        if 'ast' in tests:
            ast_tests = set(tests['ast'].keys())
di68kap's avatar
di68kap committed
394 395 396
            if not {clean_key(k) for k in ast_tests} <= {clean_key(k) for k in match_tests}:
                raise AssertionError('AST-Tests %s for parser %s lack corresponding match-tests!'
                                     % (str(ast_tests - match_tests), parser_name))
397 398
        if 'cst' in tests:
            cst_tests = set(tests['cst'].keys())
di68kap's avatar
di68kap committed
399
            if not {clean_key(k) for k in cst_tests} <= {clean_key(k) for k in match_tests}:
400 401
                raise AssertionError('CST-Tests %s lack corresponding match-tests!'
                                     % str(cst_tests - match_tests))
402 403 404

        # run match tests

405
        for test_name, test_code in tests.get('match', dict()).items():
406
            if not get_config_value('test_parallelization'):
407
                print('    ' + str(test_name))
408

eckhart's avatar
eckhart committed
409
            errflag = len(errata)
410
            try:
411
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
412
            except UnknownParserError as upe:
413
                cst = RootNode()
Eckhart Arnold's avatar
Eckhart Arnold committed
414
                cst = cst.new_error(Node(ZOMBIE_TAG, "").with_pos(0), str(upe))
eckhart's avatar
eckhart committed
415
            clean_test_name = str(test_name).replace('*', '')
eckhart's avatar
eckhart committed
416
            # log_ST(cst, "match_%s_%s.cst" % (parser_name, clean_test_name))
417
            tests.setdefault('__cst__', {})[test_name] = cst
418
            if is_error(cst.error_flag) and not lookahead_artifact(cst):
eckhart's avatar
eckhart committed
419 420
                errors = cst.errors_sorted
                adjust_error_locations(errors, test_code)
Eckhart Arnold's avatar
Eckhart Arnold committed
421
                errata.append('Match test "%s" for parser "%s" failed:\n\tExpr.:  %s\n\n\t%s\n\n' %
422
                              (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
423
                               '\n\t'.join(str(m).replace('\n', '\n\t\t') for m in errors)))
di68kap's avatar
di68kap committed
424
                # tests.setdefault('__err__', {})[test_name] = errata[-1]
425
                # write parsing-history log only in case of failure!
426
                if is_logging():
di68kap's avatar
di68kap committed
427
                    log_parsing_history(parser, "match_%s_%s.log" % (parser_name, clean_test_name))
428 429 430 431 432
            if "ast" in tests or report:
                ast = copy.deepcopy(cst)
                transform(ast)
                tests.setdefault('__ast__', {})[test_name] = ast
                # log_ST(ast, "match_%s_%s.ast" % (parser_name, clean_test_name))
433
            if verbose:
eckhart's avatar
eckhart committed
434
                infostr = '    match-test "' + test_name + '" ... '
435
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
436

eckhart's avatar
eckhart committed
437 438 439
            if "cst" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "cst", test_name))
                if compare:
440
                    if not compare.equals(cst):
eckhart's avatar
eckhart committed
441
                        errata.append('Concrete syntax tree test "%s" for parser "%s" failed:\n%s' %
eckhart's avatar
eckhart committed
442
                                      (test_name, parser_name, cst.serialize('cst')))
eckhart's avatar
eckhart committed
443 444 445 446 447 448 449
                    if verbose:
                        infostr = '      cst-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if "ast" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "ast", test_name))
                if compare:
di68kap's avatar
di68kap committed
450 451 452
                    from DHParser.transform import traverse, remove_nodes
                    traverse(ast, {'*': remove_nodes({'__TESTING_ARTIFACT__'})})
                    traverse(compare, {'*': remove_nodes({'__TESTING_ARTIFACT__'})})
453
                    if not compare.equals(ast):
eckhart's avatar
eckhart committed
454 455 456 457 458 459 460 461 462 463 464 465
                        errata.append('Abstract syntax tree test "%s" for parser "%s" failed:'
                                      '\n\tExpr.:     %s\n\tExpected:  %s\n\tReceived:  %s'
                                      % (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
                                         flatten_sxpr(compare.as_sxpr()),
                                         flatten_sxpr(ast.as_sxpr())))
                    if verbose:
                        infostr = '      ast-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if len(errata) > errflag:
                tests.setdefault('__err__', {})[test_name] = errata[-1]

466
        if verbose and 'fail' in tests:
467
            write('  Fail-Tests for parser "' + parser_name + '"')
468 469 470

        # run fail tests

471
        for test_name, test_code in tests.get('fail', dict()).items():
eckhart's avatar
eckhart committed
472
            errflag = len(errata)
473 474
            # cst = parser(test_code, parser_name)
            try:
475
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
476
            except UnknownParserError as upe:
Eckhart Arnold's avatar
Eckhart Arnold committed
477
                node = Node(ZOMBIE_TAG, "").with_pos(0)
eckhart's avatar
eckhart committed
478
                cst = RootNode(node).new_error(node, str(upe))
479
                errata.append('Unknown parser "{}" in fail test "{}"!'.format(parser_name, test_name))
480
                tests.setdefault('__err__', {})[test_name] = errata[-1]
481
            if not (is_error(cst.error_flag) and not lookahead_artifact(cst)):
482 483
                errata.append('Fail test "%s" for parser "%s" yields match instead of '
                              'expected failure!' % (test_name, parser_name))
484
                tests.setdefault('__err__', {})[test_name] = errata[-1]
485
                # write parsing-history log only in case of test-failure
486
                if is_logging():
487
                    log_parsing_history(parser, "fail_%s_%s.log" % (parser_name, test_name))
488 489
            if cst.error_flag:
                tests.setdefault('__msg__', {})[test_name] = \
490
                    "\n".join(str(e) for e in cst.errors_sorted)
491
            if verbose:
eckhart's avatar
eckhart committed
492
                infostr = '    fail-test  "' + test_name + '" ... '
493
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
494

495 496
    # write test-report
    if report:
eckhart's avatar
eckhart committed
497 498
        test_report = get_report(test_unit)
        if test_report:
499
            try:
500
                os.mkdir(report)   # is a process-Lock needed, here?
501 502
            except FileExistsError:
                pass
503
            with open(os.path.join(report, unit_name + '.md'), 'w', encoding='utf8') as f:
eckhart's avatar
eckhart committed
504
                f.write(test_report)
505

506
    print('\n'.join(output))
507 508 509
    return errata


510
def reset_unit(test_unit):
eckhart's avatar
eckhart committed
511 512 513 514
    """
    Resets the tests in ``test_unit`` by removing all results and error
    messages.
    """
515 516 517 518 519 520 521 522
    for parser, tests in test_unit.items():
        for key in list(tests.keys()):
            if key not in UNIT_STAGES:
                if key not in RESULT_STAGES:
                    print('Removing unknown component %s from test %s' % (key, parser))
                del tests[key]


523 524 525 526 527 528 529 530 531 532 533
def run_unit(logdir, *parameters):
    """
    Run `grammar_unit()` with logs written to `log_dir` or no logs if `log_dir`
    evaluates to False. This helper functions is needed for running unit tests
    in a multiprocessing environment, because log.log_dir(), log.logging() and
    log.is_logging() are thread-local.
    """
    with logging(logdir):
        return grammar_unit(*parameters)


Eckhart Arnold's avatar
Eckhart Arnold committed
534 535 536
def grammar_suite(directory, parser_factory, transformer_factory,
                  fn_patterns=['*test*'],
                  ignore_unknown_filetypes=False,
537
                  report='REPORT', verbose=True):
538 539
    """
    Runs all grammar unit tests in a directory. A file is considered a test
540 541
    unit, if it has the word "test" in its name.
    """
542
    if not isinstance(fn_patterns, collections.abc.Iterable):
Eckhart Arnold's avatar
Eckhart Arnold committed
543
        fn_patterns = [fn_patterns]
544
    all_errors = collections.OrderedDict()
545 546
    if verbose:
        print("\nScanning test-directory: " + directory)
547 548
    save_cwd = os.getcwd()
    os.chdir(directory)
eckhart's avatar
eckhart committed
549 550
    if is_logging():
        clear_logs()
551 552 553 554 555

    if get_config_value('test_parallelization'):
        with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
            results = []
            for filename in sorted(os.listdir('.')):
di68kap's avatar
di68kap committed
556
                print(filename)
557 558
                if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                    parameters = filename, parser_factory, transformer_factory, report, verbose
559
                    results.append((filename, pool.submit(run_unit, log_dir(), *parameters)))
560 561 562 563 564 565 566 567 568 569 570
            for filename, err_future in results:
                try:
                    errata = err_future.result()
                    if errata:
                        all_errors[filename] = errata
                except ValueError as e:
                    if not ignore_unknown_filetypes or str(e).find("Unknown") < 0:
                        raise e
    else:
        results = []
        for filename in sorted(os.listdir('.')):
571 572
            if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                parameters = filename, parser_factory, transformer_factory, report, verbose
573
                # print(filename)
574 575 576 577 578
                results.append((filename, grammar_unit(*parameters)))
        for filename, errata in results:
            if errata:
                all_errors[filename] = errata

579
    os.chdir(save_cwd)
eckhart's avatar
eckhart committed
580 581
    error_report = []
    err_N = 0
582 583
    if all_errors:
        for filename in all_errors:
di68kap's avatar
di68kap committed
584
            error_report.append('Errors found by unit test "%s":\n' % filename)
di68kap's avatar
di68kap committed
585
            err_N += len(all_errors[filename])
586 587 588
            for error in all_errors[filename]:
                error_report.append('\t' + '\n\t'.join(error.split('\n')))
    if error_report:
di68kap's avatar
di68kap committed
589 590 591 592
        # if verbose:
        #     print("\nFAILURE! %i error%s found!\n" % (err_N, 's' if err_N > 1 else ''))
        return ('Test suite "%s" revealed %s error%s:\n\n'
                % (directory, err_N, 's' if err_N > 1 else '') + '\n'.join(error_report))
eckhart's avatar
eckhart committed
593 594
    if verbose:
        print("\nSUCCESS! All tests passed :-)\n")
595 596 597
    return ''


eckhart's avatar
eckhart committed
598 599 600 601 602 603 604
########################################################################
#
# Support for unit-testing of ebnf-grammars
#
########################################################################


605
RX_DEFINITION_OR_SECTION = re.compile(r'(?:^|\n)[ \t]*(\w+(?=[ \t]*=)|#:.*(?=\n|$|#))')
eckhart's avatar
eckhart committed
606 607 608 609
SymbolsDictType = Dict[str, List[str]]


def extract_symbols(ebnf_text_or_file: str) -> SymbolsDictType:
610
    r"""
eckhart's avatar
eckhart committed
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
    Extracts all defined symbols from an EBNF-grammar. This can be used to
    prepare grammar-tests. The symbols will be returned as lists of strings
    which are grouped by the sections to which they belong and returned as
    an ordered dictionary, they keys of which are the section names.
    In order to define a section in the ebnf-source, add a comment-line
    starting with "#:", followed by the section name. It is recommended
    to use valid file names as section names. Example:

        #: components

        expression = term  { EXPR_OP~ term}
        term       = factor  { TERM_OP~ factor}
        factor     = [SIGN] ( NUMBER | VARIABLE | group ) { VARIABLE | group }
        group      = "(" expression ")"


        #: leaf_expressions

        EXPR_OP    = /\+/ | /-/
        TERM_OP    = /\*/ | /\//
        SIGN       = /-/

        NUMBER     = /(?:0|(?:[1-9]\d*))(?:\.\d+)?/~
        VARIABLE   = /[A-Za-z]/~

    If no sections have been defined in the comments, there will be only
    one group with the empty string as a key.

    :param ebnf_text_or_file: Either an ebnf-grammar or the file-name
            of an ebnf-grammar
    :return: Ordered dictionary mapping the section names of the grammar
            to lists of symbols that appear under that section.
    """
    def trim_section_name(name: str) -> str:
645
        return re.sub(r'[^\w-]', '_', name.replace('#:', '').strip())
eckhart's avatar
eckhart committed
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694

    ebnf = load_if_file(ebnf_text_or_file)
    deflist = RX_DEFINITION_OR_SECTION.findall(ebnf)
    if not deflist:
        raise AssertionError('No symbols found in: ' + ebnf_text_or_file[:40])
    symbols = collections.OrderedDict()  # type: SymbolsDictType
    if deflist[0][:2] != '#:':
        curr_section = ''
        symbols[curr_section] = []
    for df in deflist:
        if df[:2] == '#:':
            curr_section = trim_section_name(df)
            if curr_section in symbols:
                raise AssertionError('Section name must not be repeated: ' + curr_section)
            symbols[curr_section] = []
        else:
            symbols[curr_section].append(df)
    return symbols


def create_test_templates(symbols_or_ebnf: Union[str, SymbolsDictType],
                          path: str,
                          fmt: str = '.ini') -> None:
    """
    Creates template files for grammar unit-tests for the given symbols .

    Args:
        symbols_or_ebnf: Either a dictionary that matches section names to
                the grammar's symbols under that section or an EBNF-grammar
                or file name of an EBNF-grammar from which the symbols shall
                be extracted.
        path: the path to the grammar-test directory (usually 'grammar_tests').
                If the last element of the path does not exist, the directory
                will be created.
        fmt: the test-file-format. At the moment only '.ini' is supported
    """
    assert fmt == '.ini'
    if isinstance(symbols_or_ebnf, str):
        symbols = extract_symbols(cast(str, symbols_or_ebnf))  # type: SymbolsDictType
    else:
        symbols = cast(Dict, symbols_or_ebnf)
    if not os.path.exists(path):
        os.mkdir(path)
    if os.path.isdir(path):
        save = os.getcwd()
        os.chdir(path)
        keys = reversed(list(symbols.keys()))
        for i, k in enumerate(keys):
            filename = '{num:0>2}_test_{section}'.format(num=i+1, section=k) + fmt
695 696
            if not os.path.exists(filename):
                print('Creating test file template "{name}".'.format(name=filename))
eckhart's avatar
eckhart committed
697 698 699 700 701 702 703 704 705 706
                with open(filename, 'w', encoding='utf-8') as f:
                    for sym in symbols[k]:
                        f.write('\n[match:{sym}]\n\n'.format(sym=sym))
                        f.write('[ast:{sym}]\n\n'.format(sym=sym))
                        f.write('[fail:{sym}]\n\n'.format(sym=sym))
        os.chdir(save)
    else:
        raise ValueError(path + ' is not a directory!')


707 708 709 710 711 712
#######################################################################
#
#  general unit testing support
#
#######################################################################

713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742

def run_tests_in_class(test, namespace):
    """
    Runs all tests in test-class `test` in the given namespace.
    """
    def instantiate(cls_name, namespace):
        exec("obj = " + cls_name + "()", namespace)
        obj = namespace["obj"]
        if "setup" in dir(obj):
            obj.setup()
        return obj

    obj = None
    try:
        if test.find('.') >= 0:
            cls_name, method_name = test.split('.')
            obj = instantiate(cls_name, namespace)
            print("Running " + cls_name + "." + method_name)
            exec('obj.' + method_name + '()')
        else:
            obj = instantiate(test, namespace)
            for name in dir(obj):
                if name.lower().startswith("test"):
                    print("Running " + test + "." + name)
                    exec('obj.' + name + '()')
    finally:
        if "teardown" in dir(obj):
            obj.teardown()


743
def run_test_function(func_name, namespace):
744 745 746
    """
    Run the test-function `test` in the given namespace.
    """
747 748
    print("Running test-function: " + func_name)
    exec(func_name + '()', namespace)
749 750


eckhart's avatar
eckhart committed
751
def runner(tests, namespace):
752 753
    """
    Runs all or some selected Python unit tests found in the
eckhart's avatar
eckhart committed
754
    namespace. To run all tests in a module, call
755
    ``runner("", globals())`` from within that module.
756

757 758 759 760
    Unit-Tests are either classes, the name of which starts with
    "Test" and methods, the name of which starts with "test" contained
    in such classes or functions, the name of which starts with "test".

761
    Args:
eckhart's avatar
eckhart committed
762 763 764 765 766 767
        tests: String or list of strings with the names of tests to
            run. If empty, runner searches by itself all objects the
            of which starts with 'test' and runs it (if its a function)
            or all of its methods that start with "test" if its a class
            plus the "setup" and "teardown" methods if they exist.

eckhart's avatar
eckhart committed
768
        namespace: The namespace for running the test, usually
769
            ``globals()`` should be used.
eckhart's avatar
eckhart committed
770

771 772 773 774 775 776 777 778
    Example:
        class TestSomething()
            def setup(self):
                pass
            def teardown(self):
                pass
            def test_something(self):
                pass
eckhart's avatar
eckhart committed
779

780
        if __name__ == "__main__":
di68kap's avatar
di68kap committed
781
            from DHParser.testing import runner
eckhart's avatar
eckhart committed
782
            runner("", globals())
783
    """
eckhart's avatar
eckhart committed
784 785
    test_classes = []
    test_functions = []
786

eckhart's avatar
eckhart committed
787 788 789 790
    if tests:
        if isinstance(tests, str):
            tests = tests.split(' ')
        assert all(test.lower().startswith('test') for test in tests)
791
    else:
eckhart's avatar
eckhart committed
792 793 794 795 796 797 798 799
        tests = namespace.keys()

    for name in tests:
        if name.lower().startswith('test'):
            if inspect.isclass(namespace[name]):
                test_classes.append(name)
            elif inspect.isfunction(namespace[name]):
                test_functions.append(name)
800 801

    for test in test_classes:
802
        run_tests_in_class(test, namespace)
803 804

    for test in test_functions:
805 806 807
        run_test_function(test, namespace)


808 809
def run_file(fname):
    if fname.lower().startswith('test_') and fname.endswith('.py'):
810
        print("RUNNING " + fname)
811 812 813 814
        # print('\nRUNNING UNIT TESTS IN: ' + fname)
        exec('import ' + fname[:-3])
        runner('', eval(fname[:-3]).__dict__)

815

816 817 818 819 820 821 822 823 824
def run_with_log(logdir, f):
    """
    Run `grammar_unit()` with logs written to `log_dir` or no logs if `log_dir`
    evaluates to False. This helper functions is needed for running unit tests
    in a multiprocessing environment, because log.log_dir(), log.logging() and
    log.is_logging() are thread-local.
    """
    with logging(logdir):
        run_file(f)
825

826

827 828 829 830 831 832
def run_path(path):
    """Runs all unit tests in `path`"""
    if os.path.isdir(path):
        sys.path.append(path)
        files = os.listdir(path)
        result_futures = []
833 834 835 836

        if get_config_value('test_parallelization'):
            with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
                for f in files:
837
                    result_futures.append(pool.submit(run_with_log, log_dir(), f))
838 839 840 841 842 843 844
                    # run_file(f)  # for testing!
                for r in result_futures:
                    try:
                        _ = r.result()
                    except AssertionError as failure:
                        print(failure)
        else:
845
            for f in files:
846 847
                run_file(f)

848 849 850 851 852 853
    else:
        path, fname = os.path.split(path)
        sys.path.append(path)
        run_file(fname)
    sys.path.pop()

854

855
def clean_report(report_dir='REPORT'):
856 857
    """Deletes any test-report-files in the REPORT sub-directory and removes
    the REPORT sub-directory, if it is empty after deleting the files."""
di68kap's avatar
di68kap committed
858
    # TODO: make this thread safe, if possible!!!!
859 860
    if os.path.exists(report_dir):
        files = os.listdir(report_dir)
861 862 863
        flag = False
        for file in files:
            if re.match(r'\w*_test_\d+\.md', file):
864
                os.remove(os.path.join(report_dir, file))
865 866 867
            else:
                flag = True
        if not flag:
868
            os.rmdir(report_dir)