testing.py 33.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# testing.py - test support for DHParser based grammars and compilers
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
17

18
19
20
21
22
23
24
25
26
"""
Module ``testing`` contains support for unit-testing domain specific
languages. Tests for arbitrarily small components of the Grammar can
be written into test files with ini-file syntax in order to test
whether the parser matches or fails as expected. It can also be
tested whether it produces an expected concrete or abstract syntax tree.
Usually, however, unexpected failure to match a certain string is the
main cause of trouble when constructing a context free Grammar.
"""
27
28


29
import collections
30
import concurrent.futures
31
import copy
Eckhart Arnold's avatar
Eckhart Arnold committed
32
import fnmatch
di68kap's avatar
di68kap committed
33
import inspect
34
import json
35
import multiprocessing
36
import os
37
import sys
38
from typing import Dict, List, Union, cast
39

di68kap's avatar
di68kap committed
40
from DHParser.error import Error, is_error, adjust_error_locations
41
from DHParser.log import log_dir, logging, is_logging, clear_logs, log_parsing_history
42
from DHParser.parse import UnknownParserError, Parser, Lookahead
43
from DHParser.syntaxtree import Node, RootNode, parse_tree, flatten_sxpr, serialize, ZOMBIE_TAG
44
from DHParser.toolkit import GLOBALS, get_config_value, load_if_file, re
45

46

di68kap's avatar
di68kap committed
47
__all__ = ('unit_from_config',
48
           'unit_from_json',
di68kap's avatar
di68kap committed
49
           'TEST_READERS',
50
51
52
53
           'unit_from_file',
           'get_report',
           'grammar_unit',
           'grammar_suite',
eckhart's avatar
eckhart committed
54
55
56
           'SymbolsDictType',
           'extract_symbols',
           'create_test_templates',
57
           'reset_unit',
58
59
           'runner',
           'clean_report')
60

61

62
63
UNIT_STAGES = {'match*', 'match', 'fail', 'ast', 'cst'}
RESULT_STAGES = {'__cst__', '__ast__', '__err__'}
64

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# def unit_from_configfile(config_filename):
#     """
#     Reads a grammar unit test from a config file.
#     """
#     cfg = configparser.ConfigParser(interpolation=None)
#     cfg.read(config_filename, encoding="utf8")
#     OD = collections.OrderedDict
#     unit = OD()
#     for section in cfg.sections():
#         symbol, stage = section.split(':')
#         if stage not in UNIT_STAGES:
#             if symbol in UNIT_STAGES:
#                 symbol, stage = stage, symbol
#             else:
#                 raise ValueError('Test stage %s not in: ' % (stage, str(UNIT_STAGES)))
#         for testkey, testcode in cfg[section].items():
#             if testcode[:3] + testcode[-3:] in {"''''''", '""""""'}:
#                 testcode = testcode[3:-3]
#                 # testcode = testcode.replace('\\#', '#')
#                 testcode = re.sub(r'(?<!\\)\\#', '#', testcode).replace('\\\\', '\\')
#             elif testcode[:1] + testcode[-1:] in {"''", '""'}:
#                 testcode = testcode[1:-1]
#             unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
#     # print(json.dumps(unit, sort_keys=True, indent=4))
#     return unit

eckhart's avatar
eckhart committed
91
RX_SECTION = re.compile(r'\s*\[(?P<stage>\w+):(?P<symbol>\w+)\]')
92
RE_VALUE = '(?:"""((?:.|\n)*?)""")|' + "(?:'''((?:.|\n)*?)''')|" + \
eckhart's avatar
eckhart committed
93
           r'(?:"(.*?)")|' + "(?:'(.*?)')|" + r'(.*(?:\n(?:\s*\n)*    .*)*)'
94
95
# the following does not work with pypy3, because pypy's re-engine does not
# support local flags, e.g. '(?s: )'
eckhart's avatar
eckhart committed
96
97
98
99
# RE_VALUE = r'(?:"""((?s:.*?))""")|' + "(?:'''((?s:.*?))''')|" + \
#            r'(?:"(.*?)")|' + "(?:'(.*?)')|" + '(.*(?:\n(?:\s*\n)*    .*)*)'
RX_ENTRY = re.compile(r'\s*(\w+\*?)\s*:\s*(?:{value})\s*'.format(value=RE_VALUE))
RX_COMMENT = re.compile(r'\s*#.*\n')
100

101

di68kap's avatar
di68kap committed
102
def unit_from_config(config_str):
103
104
105
106
    """ Reads grammar unit tests contained in a file in config file (.ini)
    syntax.

    Args:
di68kap's avatar
di68kap committed
107
        config_str (str): A string containing a config-file with Grammar unit-tests
108
109
110
111

    Returns:
        A dictionary representing the unit tests.
    """
eckhart's avatar
eckhart committed
112
113
    # TODO: issue a warning if the same match:xxx or fail:xxx block appears more than once

114
115
116
117
118
119
120
    def eat_comments(txt, pos):
        m = RX_COMMENT.match(txt, pos)
        while m:
            pos = m.span()[1]
            m = RX_COMMENT.match(txt, pos)
        return pos

di68kap's avatar
di68kap committed
121
    cfg = config_str.replace('\t', '    ')
122

123
124
    OD = collections.OrderedDict
    unit = OD()
125
126
127
128
129
130

    pos = eat_comments(cfg, 0)
    section_match = RX_SECTION.match(cfg, pos)
    while section_match:
        d = section_match.groupdict()
        stage = d['stage']
131
        if stage not in UNIT_STAGES:
132
133
134
135
136
            raise KeyError('Unknown stage ' + stage + " ! must be one of: " + str(UNIT_STAGES))
        symbol = d['symbol']
        pos = eat_comments(cfg, section_match.span()[1])

        entry_match = RX_ENTRY.match(cfg, pos)
eckhart's avatar
eckhart committed
137
138
        # if entry_match is None:
        #     SyntaxError('No entries in section [%s:%s]' % (stage, symbol))
139
140
141
142
143
144
145
146
147
148
        while entry_match:
            testkey, testcode = [group for group in entry_match.groups() if group is not None]
            lines = testcode.split('\n')
            if len(lines) > 1:
                indent = sys.maxsize
                for line in lines[1:]:
                    indent = min(indent, len(line) - len(line.lstrip()))
                for i in range(1, len(lines)):
                    lines[i] = lines[i][indent:]
                testcode = '\n'.join(lines)
149
150
            # unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
            test = unit.setdefault(symbol, OD()).setdefault(stage, OD())
151
152
            assert testkey.strip('*') not in test and (testkey.strip('*') + '*') not in test, \
                "Key %s already exists in text %s:%s !" % (testkey, stage, symbol)
153
            test[testkey] = testcode
154
155
156
157
158
            pos = eat_comments(cfg, entry_match.span()[1])
            entry_match = RX_ENTRY.match(cfg, pos)

        section_match = RX_SECTION.match(cfg, pos)

159
    if pos != len(cfg) and not re.match(r'\s+$', cfg[pos:]):
160
        raise SyntaxError('in line %i' % (cfg[:pos].count('\n') + 2))  # TODO: Add file name
161

162
    return unit
163

164

di68kap's avatar
di68kap committed
165
def unit_from_json(json_str):
166
    """
di68kap's avatar
di68kap committed
167
    Reads grammar unit tests from a json string.
168
    """
di68kap's avatar
di68kap committed
169
    unit = json.loads(json_str)
170
171
172
    for symbol in unit:
        for stage in unit[symbol]:
            if stage not in UNIT_STAGES:
173
                raise ValueError('Test stage %s not in: %s' % (stage, str(UNIT_STAGES)))
174
175
    return unit

di68kap's avatar
di68kap committed
176

177
# TODO: add support for yaml, cson, toml
178
179


di68kap's avatar
di68kap committed
180
181
182
183
184
185
186
187
188
# A dictionary associating file endings with reader functions that
# transfrom strings containing the file's content to a nested dictionary
# structure of test cases.
TEST_READERS = {
    '.ini': unit_from_config,
    '.json': unit_from_json
}


189
def unit_from_file(filename):
190
191
    """
    Reads a grammar unit test from a file. The format of the file is
192
193
    determined by the ending of its name.
    """
di68kap's avatar
di68kap committed
194
195
196
197
198
199
    try:
        reader = TEST_READERS[os.path.splitext(filename)[1].lower()]
        with open(filename, 'r', encoding='utf8') as f:
            data = f.read()
        test_unit = reader(data)
    except KeyError:
200
        raise ValueError("Unknown unit test file type: " + filename[filename.rfind('.'):])
201

di68kap's avatar
di68kap committed
202
203
204
    # Check for ambiguous Test names
    errors = []
    for parser_name, tests in test_unit.items():
di68kap's avatar
di68kap committed
205
206
207
208
209
210
211
212
        # normalize case for test category names
        keys = list(tests.keys())
        for key in keys:
            new_key = key.lower()
            if new_key != key:
                tests[new_key] = tests[keys]
                del tests[keys]

di68kap's avatar
di68kap committed
213
214
        m_names = set(tests.get('match', dict()).keys())
        f_names = set(tests.get('fail', dict()).keys())
215
216
        intersection = list(m_names & f_names)
        intersection.sort()
di68kap's avatar
di68kap committed
217
218
        if intersection:
            errors.append("Same names %s assigned to match and fail test "
219
220
                          "of parser %s." % (str(intersection), parser_name) +
                          " Please, use different names!")
di68kap's avatar
di68kap committed
221
222
223
224
225
226
    if errors:
        raise EnvironmentError("Error(s) in Testfile %s :\n" % filename
                               + '\n'.join(errors))

    return test_unit

227

di68kap's avatar
di68kap committed
228
229
230
231
232
233
# def all_match_tests(tests):
#     """Returns all match tests from ``tests``, This includes match tests
#     marked with an asterix for CST-output as well as unmarked match-tests.
#     """
#     return itertools.chain(tests.get('match', dict()).items(),
#                            tests.get('match*', dict()).items())
234
235


236
def get_report(test_unit):
237
    """
238
239
240
241
    Returns a text-report of the results of a grammar unit test. The report
    lists the source of all tests as well as the error messages, if a test
    failed or the abstract-syntax-tree (AST) in case of success.

242
243
    If an asterix has been appended to the test name then the concrete syntax
    tree will also be added to the report in this particular case.
244
245
246
247

    The purpose of the latter is to help constructing and debugging
    of AST-Transformations. It is better to switch the CST-output on and off
    with the asterix marker when needed than to output the CST for all tests
248
    which would unnecessarily bloat the test reports.
249
    """
250
251
252
253
    def indent(txt):
        lines = txt.split('\n')
        lines[0] = '    ' + lines[0]
        return "\n    ".join(lines)
254

255
256
257
258
    report = []
    for parser_name, tests in test_unit.items():
        heading = 'Test of parser: "%s"' % parser_name
        report.append('\n\n%s\n%s\n' % (heading, '=' * len(heading)))
259
        for test_name, test_code in tests.get('match', dict()).items():
260
261
262
            heading = 'Match-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
263
            report.append(indent(test_code))
264
265
266
267
268
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
            ast = tests.get('__ast__', {}).get(test_name, None)
269
            cst = tests.get('__cst__', {}).get(test_name, None)
270
            if cst and (not ast or str(test_name).endswith('*')):
271
                report.append('\n### CST')
272
                report.append(indent(serialize(cst, 'cst')))
273
            if ast:
274
                report.append('\n### AST')
275
                report.append(indent(serialize(ast, 'ast')))
di68kap's avatar
di68kap committed
276
277
278
279
280
        for test_name, test_code in tests.get('fail', dict()).items():
            heading = 'Fail-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
            report.append(indent(test_code))
281
282
283
284
            messages = tests.get('__msg__', {}).get(test_name, "")
            if messages:
                report.append('\n### Messages:')
                report.append(messages)
di68kap's avatar
di68kap committed
285
286
287
288
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
289
290
291
    return '\n'.join(report)


292
def grammar_unit(test_unit, parser_factory, transformer_factory, report=True, verbose=False):
293
294
    """
    Unit tests for a grammar-parser and ast transformations.
295
    """
296
297
298
299
300
301
302
303
304
    output = []

    def write(s):
        nonlocal output
        """Append string `s` to output. The purpose is to defer printing to
        stdout in order to avoid muddled output when several unit tests run
        at the same time."""
        output.append(s)

di68kap's avatar
di68kap committed
305
306
307
308
309
310
    def clean_key(k):
        try:
            return k.replace('*', '')
        except AttributeError:
            return k

eckhart's avatar
eckhart committed
311
    def get(tests, category, key) -> str:
di68kap's avatar
di68kap committed
312
313
314
315
        try:
            value = tests[category][key] if key in tests[category] \
                else tests[category][clean_key(key)]
        except KeyError:
eckhart's avatar
eckhart committed
316
317
318
            return ''
            # raise AssertionError('%s-test %s for parser %s missing !?'
            #                      % (category, test_name, parser_name))
di68kap's avatar
di68kap committed
319
320
        return value

321
    if isinstance(test_unit, str):
322
        _, unit_name = os.path.split(os.path.splitext(test_unit)[0])
323
        test_unit = unit_from_file(test_unit)
324
    else:
325
        unit_name = 'unit_test_' + str(id(test_unit))
326
    if verbose:
327
        write("\nGRAMMAR TEST UNIT: " + unit_name)
328
329
330
    errata = []
    parser = parser_factory()
    transform = transformer_factory()
331

332
    def has_lookahead(parser_name: str) -> bool:
333
        """Returns True if the parser or any of its descendant parsers is a
334
335
336
337
338
339
340
341
342
343
        Lookahead parser."""
        lookahead_found = False

        def find_lookahead(p: Parser):
            nonlocal lookahead_found
            if not lookahead_found:
                lookahead_found = isinstance(p, Lookahead)

        parser[parser_name].apply(find_lookahead)
        return lookahead_found
344

345
    def lookahead_artifact(st):
di68kap's avatar
di68kap committed
346
        """
347
        Returns True, if the error merely occurred, because the parser
eckhart's avatar
eckhart committed
348
        stopped in front of a sequence that was captured by a lookahead
349
350
351
        operator or if a mandatory lookahead failed at the end of data.
        This is required for testing of parsers that put a lookahead
        operator at the end. See test_testing.TestLookahead.
di68kap's avatar
di68kap committed
352
        """
353
354
355
356
357
358
359
360
        raw_errors = st.errors_sorted
        is_artifact = ((len(raw_errors) == 2  # case 1:  superfluous data for lookahead
                        and raw_errors[-1].code == Error.PARSER_LOOKAHEAD_MATCH_ONLY
                        and raw_errors[-2].code == Error.PARSER_STOPPED_BEFORE_END)
                       #  case 2:  mandatory lookahead failure at end of text
                       or (len(raw_errors) == 1
                           and raw_errors[-1].code == Error.MANDATORY_CONTINUATION_AT_EOF))
        if is_artifact:
361
362
            # don't remove zombie node with error message at the end
            # but change it's tag_name to indicate that it is an artifact!
363
364
            for parent in st.select(lambda node: any(child.tag_name == ZOMBIE_TAG
                                                     for child in node.children),
eckhart's avatar
eckhart committed
365
                                    include_root=True, reverse=True):
366
367
368
369
                zombie = parent[ZOMBIE_TAG]
                zombie.tag_name = '__TESTING_ARTIFACT__'
                zombie.result = 'Artifact can be ignored, but tree structure may not be fully reliable!'
                # parent.result = tuple(c for c in parent.children if c.tag_name != ZOMBIE_TAG)
370
371
                break
        return is_artifact
di68kap's avatar
di68kap committed
372

373
    for parser_name, tests in test_unit.items():
374
375
376
        if not get_config_value('test_parallelization'):
            print('  ' + parser_name)

377
        assert parser_name, "Missing parser name in test %s!" % unit_name
eckhart's avatar
eckhart committed
378
        assert not any(test_type in RESULT_STAGES for test_type in tests), \
379
380
381
382
383
            ("Test %s in %s already has results. Use reset_unit() before running again!"
             % (parser_name, unit_name))
        assert set(tests.keys()).issubset(UNIT_STAGES), \
            'Unknown test-types: %s ! Must be one of %s' \
            % (set(tests.keys()) - UNIT_STAGES, UNIT_STAGES)
384
        if verbose:
385
            write('  Match-Tests for parser "' + parser_name + '"')
386
        match_tests = set(tests['match'].keys()) if 'match' in tests else set()
387
388
        if 'ast' in tests:
            ast_tests = set(tests['ast'].keys())
di68kap's avatar
di68kap committed
389
390
391
            if not {clean_key(k) for k in ast_tests} <= {clean_key(k) for k in match_tests}:
                raise AssertionError('AST-Tests %s for parser %s lack corresponding match-tests!'
                                     % (str(ast_tests - match_tests), parser_name))
392
393
        if 'cst' in tests:
            cst_tests = set(tests['cst'].keys())
di68kap's avatar
di68kap committed
394
            if not {clean_key(k) for k in cst_tests} <= {clean_key(k) for k in match_tests}:
395
396
                raise AssertionError('CST-Tests %s lack corresponding match-tests!'
                                     % str(cst_tests - match_tests))
397
398
399

        # run match tests

400
        for test_name, test_code in tests.get('match', dict()).items():
401
402
403
            if not get_config_value('test_parallelization'):
                print('    ' + test_name)

eckhart's avatar
eckhart committed
404
            errflag = len(errata)
405
            try:
406
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
407
            except UnknownParserError as upe:
408
                cst = RootNode()
Eckhart Arnold's avatar
Eckhart Arnold committed
409
                cst = cst.new_error(Node(ZOMBIE_TAG, "").with_pos(0), str(upe))
eckhart's avatar
eckhart committed
410
            clean_test_name = str(test_name).replace('*', '')
eckhart's avatar
eckhart committed
411
            # log_ST(cst, "match_%s_%s.cst" % (parser_name, clean_test_name))
412
            tests.setdefault('__cst__', {})[test_name] = cst
413
            if is_error(cst.error_flag) and not lookahead_artifact(cst):
eckhart's avatar
eckhart committed
414
415
                errors = cst.errors_sorted
                adjust_error_locations(errors, test_code)
Eckhart Arnold's avatar
Eckhart Arnold committed
416
                errata.append('Match test "%s" for parser "%s" failed:\n\tExpr.:  %s\n\n\t%s\n\n' %
417
                              (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
418
                               '\n\t'.join(str(m).replace('\n', '\n\t\t') for m in errors)))
di68kap's avatar
di68kap committed
419
                # tests.setdefault('__err__', {})[test_name] = errata[-1]
420
                # write parsing-history log only in case of failure!
421
                if is_logging():
di68kap's avatar
di68kap committed
422
                    log_parsing_history(parser, "match_%s_%s.log" % (parser_name, clean_test_name))
423
424
425
426
427
            if "ast" in tests or report:
                ast = copy.deepcopy(cst)
                transform(ast)
                tests.setdefault('__ast__', {})[test_name] = ast
                # log_ST(ast, "match_%s_%s.ast" % (parser_name, clean_test_name))
428
            if verbose:
eckhart's avatar
eckhart committed
429
                infostr = '    match-test "' + test_name + '" ... '
430
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
431

eckhart's avatar
eckhart committed
432
433
434
            if "cst" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "cst", test_name))
                if compare:
435
                    if not compare.equals(cst):
eckhart's avatar
eckhart committed
436
                        errata.append('Concrete syntax tree test "%s" for parser "%s" failed:\n%s' %
437
                                      (test_name, parser_name, serialize(cst, 'cst')))
eckhart's avatar
eckhart committed
438
439
440
441
442
443
444
                    if verbose:
                        infostr = '      cst-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if "ast" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "ast", test_name))
                if compare:
445
                    if not compare.equals(ast):
eckhart's avatar
eckhart committed
446
447
448
449
450
451
452
453
454
455
456
457
                        errata.append('Abstract syntax tree test "%s" for parser "%s" failed:'
                                      '\n\tExpr.:     %s\n\tExpected:  %s\n\tReceived:  %s'
                                      % (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
                                         flatten_sxpr(compare.as_sxpr()),
                                         flatten_sxpr(ast.as_sxpr())))
                    if verbose:
                        infostr = '      ast-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if len(errata) > errflag:
                tests.setdefault('__err__', {})[test_name] = errata[-1]

458
        if verbose and 'fail' in tests:
459
            write('  Fail-Tests for parser "' + parser_name + '"')
460
461
462

        # run fail tests

463
        for test_name, test_code in tests.get('fail', dict()).items():
eckhart's avatar
eckhart committed
464
            errflag = len(errata)
465
466
            # cst = parser(test_code, parser_name)
            try:
467
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
468
            except UnknownParserError as upe:
Eckhart Arnold's avatar
Eckhart Arnold committed
469
                node = Node(ZOMBIE_TAG, "").with_pos(0)
eckhart's avatar
eckhart committed
470
                cst = RootNode(node).new_error(node, str(upe))
471
                errata.append('Unknown parser "{}" in fail test "{}"!'.format(parser_name, test_name))
472
                tests.setdefault('__err__', {})[test_name] = errata[-1]
473
            if not is_error(cst.error_flag) and not lookahead_artifact(cst):
474
475
                errata.append('Fail test "%s" for parser "%s" yields match instead of '
                              'expected failure!' % (test_name, parser_name))
476
                tests.setdefault('__err__', {})[test_name] = errata[-1]
477
                # write parsing-history log only in case of test-failure
478
                if is_logging():
479
                    log_parsing_history(parser, "fail_%s_%s.log" % (parser_name, test_name))
480
481
            if cst.error_flag:
                tests.setdefault('__msg__', {})[test_name] = \
482
                    "\n".join(str(e) for e in cst.errors_sorted)
483
            if verbose:
eckhart's avatar
eckhart committed
484
                infostr = '    fail-test  "' + test_name + '" ... '
485
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
486

487
488
    # write test-report
    if report:
489
        report_dir = "REPORT"
eckhart's avatar
eckhart committed
490
491
        test_report = get_report(test_unit)
        if test_report:
492
            try:
eckhart's avatar
eckhart committed
493
                os.mkdir(report_dir)
494
495
            except FileExistsError:
                pass
eckhart's avatar
eckhart committed
496
497
            with open(os.path.join(report_dir, unit_name + '.md'), 'w', encoding='utf8') as f:
                f.write(test_report)
498

499
    print('\n'.join(output))
500
501
502
    return errata


503
def reset_unit(test_unit):
eckhart's avatar
eckhart committed
504
505
506
507
    """
    Resets the tests in ``test_unit`` by removing all results and error
    messages.
    """
508
509
510
511
512
513
514
515
    for parser, tests in test_unit.items():
        for key in list(tests.keys()):
            if key not in UNIT_STAGES:
                if key not in RESULT_STAGES:
                    print('Removing unknown component %s from test %s' % (key, parser))
                del tests[key]


516
517
518
519
520
521
522
523
524
525
526
def run_unit(logdir, *parameters):
    """
    Run `grammar_unit()` with logs written to `log_dir` or no logs if `log_dir`
    evaluates to False. This helper functions is needed for running unit tests
    in a multiprocessing environment, because log.log_dir(), log.logging() and
    log.is_logging() are thread-local.
    """
    with logging(logdir):
        return grammar_unit(*parameters)


Eckhart Arnold's avatar
Eckhart Arnold committed
527
528
529
530
def grammar_suite(directory, parser_factory, transformer_factory,
                  fn_patterns=['*test*'],
                  ignore_unknown_filetypes=False,
                  report=True, verbose=True):
531
532
    """
    Runs all grammar unit tests in a directory. A file is considered a test
533
534
    unit, if it has the word "test" in its name.
    """
535
    if not isinstance(fn_patterns, collections.abc.Iterable):
Eckhart Arnold's avatar
Eckhart Arnold committed
536
        fn_patterns = [fn_patterns]
537
    all_errors = collections.OrderedDict()
538
539
    if verbose:
        print("\nScanning test-directory: " + directory)
540
541
    save_cwd = os.getcwd()
    os.chdir(directory)
eckhart's avatar
eckhart committed
542
543
    if is_logging():
        clear_logs()
544
545
546
547
548

    if get_config_value('test_parallelization'):
        with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
            results = []
            for filename in sorted(os.listdir('.')):
di68kap's avatar
di68kap committed
549
                print(filename)
550
551
                if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                    parameters = filename, parser_factory, transformer_factory, report, verbose
552
                    results.append((filename, pool.submit(run_unit, log_dir(), *parameters)))
553
554
555
556
557
558
559
560
561
562
563
            for filename, err_future in results:
                try:
                    errata = err_future.result()
                    if errata:
                        all_errors[filename] = errata
                except ValueError as e:
                    if not ignore_unknown_filetypes or str(e).find("Unknown") < 0:
                        raise e
    else:
        results = []
        for filename in sorted(os.listdir('.')):
564
565
            if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                parameters = filename, parser_factory, transformer_factory, report, verbose
566
                # print(filename)
567
568
569
570
571
                results.append((filename, grammar_unit(*parameters)))
        for filename, errata in results:
            if errata:
                all_errors[filename] = errata

572
    os.chdir(save_cwd)
eckhart's avatar
eckhart committed
573
574
    error_report = []
    err_N = 0
575
576
    if all_errors:
        for filename in all_errors:
di68kap's avatar
di68kap committed
577
            error_report.append('Errors found by unit test "%s":\n' % filename)
di68kap's avatar
di68kap committed
578
            err_N += len(all_errors[filename])
579
580
581
            for error in all_errors[filename]:
                error_report.append('\t' + '\n\t'.join(error.split('\n')))
    if error_report:
di68kap's avatar
di68kap committed
582
583
584
585
        # if verbose:
        #     print("\nFAILURE! %i error%s found!\n" % (err_N, 's' if err_N > 1 else ''))
        return ('Test suite "%s" revealed %s error%s:\n\n'
                % (directory, err_N, 's' if err_N > 1 else '') + '\n'.join(error_report))
eckhart's avatar
eckhart committed
586
587
    if verbose:
        print("\nSUCCESS! All tests passed :-)\n")
588
589
590
    return ''


eckhart's avatar
eckhart committed
591
592
593
594
595
596
597
########################################################################
#
# Support for unit-testing of ebnf-grammars
#
########################################################################


598
RX_DEFINITION_OR_SECTION = re.compile(r'(?:^|\n)[ \t]*(\w+(?=[ \t]*=)|#:.*(?=\n|$|#))')
eckhart's avatar
eckhart committed
599
600
601
602
SymbolsDictType = Dict[str, List[str]]


def extract_symbols(ebnf_text_or_file: str) -> SymbolsDictType:
603
    r"""
eckhart's avatar
eckhart committed
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
    Extracts all defined symbols from an EBNF-grammar. This can be used to
    prepare grammar-tests. The symbols will be returned as lists of strings
    which are grouped by the sections to which they belong and returned as
    an ordered dictionary, they keys of which are the section names.
    In order to define a section in the ebnf-source, add a comment-line
    starting with "#:", followed by the section name. It is recommended
    to use valid file names as section names. Example:

        #: components

        expression = term  { EXPR_OP~ term}
        term       = factor  { TERM_OP~ factor}
        factor     = [SIGN] ( NUMBER | VARIABLE | group ) { VARIABLE | group }
        group      = "(" expression ")"


        #: leaf_expressions

        EXPR_OP    = /\+/ | /-/
        TERM_OP    = /\*/ | /\//
        SIGN       = /-/

        NUMBER     = /(?:0|(?:[1-9]\d*))(?:\.\d+)?/~
        VARIABLE   = /[A-Za-z]/~

    If no sections have been defined in the comments, there will be only
    one group with the empty string as a key.

    :param ebnf_text_or_file: Either an ebnf-grammar or the file-name
            of an ebnf-grammar
    :return: Ordered dictionary mapping the section names of the grammar
            to lists of symbols that appear under that section.
    """
    def trim_section_name(name: str) -> str:
638
        return re.sub(r'[^\w-]', '_', name.replace('#:', '').strip())
eckhart's avatar
eckhart committed
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687

    ebnf = load_if_file(ebnf_text_or_file)
    deflist = RX_DEFINITION_OR_SECTION.findall(ebnf)
    if not deflist:
        raise AssertionError('No symbols found in: ' + ebnf_text_or_file[:40])
    symbols = collections.OrderedDict()  # type: SymbolsDictType
    if deflist[0][:2] != '#:':
        curr_section = ''
        symbols[curr_section] = []
    for df in deflist:
        if df[:2] == '#:':
            curr_section = trim_section_name(df)
            if curr_section in symbols:
                raise AssertionError('Section name must not be repeated: ' + curr_section)
            symbols[curr_section] = []
        else:
            symbols[curr_section].append(df)
    return symbols


def create_test_templates(symbols_or_ebnf: Union[str, SymbolsDictType],
                          path: str,
                          fmt: str = '.ini') -> None:
    """
    Creates template files for grammar unit-tests for the given symbols .

    Args:
        symbols_or_ebnf: Either a dictionary that matches section names to
                the grammar's symbols under that section or an EBNF-grammar
                or file name of an EBNF-grammar from which the symbols shall
                be extracted.
        path: the path to the grammar-test directory (usually 'grammar_tests').
                If the last element of the path does not exist, the directory
                will be created.
        fmt: the test-file-format. At the moment only '.ini' is supported
    """
    assert fmt == '.ini'
    if isinstance(symbols_or_ebnf, str):
        symbols = extract_symbols(cast(str, symbols_or_ebnf))  # type: SymbolsDictType
    else:
        symbols = cast(Dict, symbols_or_ebnf)
    if not os.path.exists(path):
        os.mkdir(path)
    if os.path.isdir(path):
        save = os.getcwd()
        os.chdir(path)
        keys = reversed(list(symbols.keys()))
        for i, k in enumerate(keys):
            filename = '{num:0>2}_test_{section}'.format(num=i+1, section=k) + fmt
688
689
            if not os.path.exists(filename):
                print('Creating test file template "{name}".'.format(name=filename))
eckhart's avatar
eckhart committed
690
691
692
693
694
695
696
697
698
699
                with open(filename, 'w', encoding='utf-8') as f:
                    for sym in symbols[k]:
                        f.write('\n[match:{sym}]\n\n'.format(sym=sym))
                        f.write('[ast:{sym}]\n\n'.format(sym=sym))
                        f.write('[fail:{sym}]\n\n'.format(sym=sym))
        os.chdir(save)
    else:
        raise ValueError(path + ' is not a directory!')


700
701
702
703
704
705
#######################################################################
#
#  general unit testing support
#
#######################################################################

706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735

def run_tests_in_class(test, namespace):
    """
    Runs all tests in test-class `test` in the given namespace.
    """
    def instantiate(cls_name, namespace):
        exec("obj = " + cls_name + "()", namespace)
        obj = namespace["obj"]
        if "setup" in dir(obj):
            obj.setup()
        return obj

    obj = None
    try:
        if test.find('.') >= 0:
            cls_name, method_name = test.split('.')
            obj = instantiate(cls_name, namespace)
            print("Running " + cls_name + "." + method_name)
            exec('obj.' + method_name + '()')
        else:
            obj = instantiate(test, namespace)
            for name in dir(obj):
                if name.lower().startswith("test"):
                    print("Running " + test + "." + name)
                    exec('obj.' + name + '()')
    finally:
        if "teardown" in dir(obj):
            obj.teardown()


736
def run_test_function(func_name, namespace):
737
738
739
    """
    Run the test-function `test` in the given namespace.
    """
740
741
    print("Running test-function: " + func_name)
    exec(func_name + '()', namespace)
742
743


eckhart's avatar
eckhart committed
744
def runner(tests, namespace):
745
746
    """
    Runs all or some selected Python unit tests found in the
eckhart's avatar
eckhart committed
747
    namespace. To run all tests in a module, call
748
    ``runner("", globals())`` from within that module.
749

750
751
752
753
    Unit-Tests are either classes, the name of which starts with
    "Test" and methods, the name of which starts with "test" contained
    in such classes or functions, the name of which starts with "test".

754
    Args:
eckhart's avatar
eckhart committed
755
756
757
758
759
760
        tests: String or list of strings with the names of tests to
            run. If empty, runner searches by itself all objects the
            of which starts with 'test' and runs it (if its a function)
            or all of its methods that start with "test" if its a class
            plus the "setup" and "teardown" methods if they exist.

eckhart's avatar
eckhart committed
761
        namespace: The namespace for running the test, usually
762
            ``globals()`` should be used.
eckhart's avatar
eckhart committed
763

764
765
766
767
768
769
770
771
    Example:
        class TestSomething()
            def setup(self):
                pass
            def teardown(self):
                pass
            def test_something(self):
                pass
eckhart's avatar
eckhart committed
772

773
        if __name__ == "__main__":
di68kap's avatar
di68kap committed
774
            from DHParser.testing import runner
eckhart's avatar
eckhart committed
775
            runner("", globals())
776
    """
eckhart's avatar
eckhart committed
777
778
    test_classes = []
    test_functions = []
779

eckhart's avatar
eckhart committed
780
781
782
783
    if tests:
        if isinstance(tests, str):
            tests = tests.split(' ')
        assert all(test.lower().startswith('test') for test in tests)
784
    else:
eckhart's avatar
eckhart committed
785
786
787
788
789
790
791
792
        tests = namespace.keys()

    for name in tests:
        if name.lower().startswith('test'):
            if inspect.isclass(namespace[name]):
                test_classes.append(name)
            elif inspect.isfunction(namespace[name]):
                test_functions.append(name)
793
794

    for test in test_classes:
795
        run_tests_in_class(test, namespace)
796
797

    for test in test_functions:
798
799
800
        run_test_function(test, namespace)


801
802
def run_file(fname):
    if fname.lower().startswith('test_') and fname.endswith('.py'):
803
        print("RUNNING " + fname)
804
805
806
807
        # print('\nRUNNING UNIT TESTS IN: ' + fname)
        exec('import ' + fname[:-3])
        runner('', eval(fname[:-3]).__dict__)

808

809
810
811
812
813
814
815
816
817
def run_with_log(logdir, f):
    """
    Run `grammar_unit()` with logs written to `log_dir` or no logs if `log_dir`
    evaluates to False. This helper functions is needed for running unit tests
    in a multiprocessing environment, because log.log_dir(), log.logging() and
    log.is_logging() are thread-local.
    """
    with logging(logdir):
        run_file(f)
818

819

820
821
822
823
824
825
def run_path(path):
    """Runs all unit tests in `path`"""
    if os.path.isdir(path):
        sys.path.append(path)
        files = os.listdir(path)
        result_futures = []
826
827
828
829

        if get_config_value('test_parallelization'):
            with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
                for f in files:
830
                    result_futures.append(pool.submit(run_with_log, log_dir(), f))
831
832
833
834
835
836
837
                    # run_file(f)  # for testing!
                for r in result_futures:
                    try:
                        _ = r.result()
                    except AssertionError as failure:
                        print(failure)
        else:
838
            for f in files:
839
840
                run_file(f)

841
842
843
844
845
846
    else:
        path, fname = os.path.split(path)
        sys.path.append(path)
        run_file(fname)
    sys.path.pop()

847
848
849
850
851
852
853
854
855
856
857
858
859
860

def clean_report():
    """Deletes any test-report-files in the REPORT sub-directory and removes
    the REPORT sub-directory, if it is empty after deleting the files."""
    if os.path.exists('REPORT'):
        files = os.listdir('REPORT')
        flag = False
        for file in files:
            if re.match(r'\w*_test_\d+\.md', file):
                os.remove(os.path.join('REPORT', file))
            else:
                flag = True
        if not flag:
            os.rmdir('REPORT')