testing.py 32.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# testing.py - test support for DHParser based grammars and compilers
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
17

18
19
20
21
22
23
24
25
26
"""
Module ``testing`` contains support for unit-testing domain specific
languages. Tests for arbitrarily small components of the Grammar can
be written into test files with ini-file syntax in order to test
whether the parser matches or fails as expected. It can also be
tested whether it produces an expected concrete or abstract syntax tree.
Usually, however, unexpected failure to match a certain string is the
main cause of trouble when constructing a context free Grammar.
"""
27
28


29
import collections
30
import concurrent.futures
31
import copy
Eckhart Arnold's avatar
Eckhart Arnold committed
32
import fnmatch
di68kap's avatar
di68kap committed
33
import inspect
34
import json
35
import multiprocessing
36
import os
37
import sys
38
from typing import Dict, List, Union, cast
39

di68kap's avatar
di68kap committed
40
from DHParser.error import Error, is_error, adjust_error_locations
41
42
from DHParser.log import is_logging, clear_logs, log_parsing_history
from DHParser.parse import UnknownParserError, Parser, Lookahead
43
44
from DHParser.syntaxtree import Node, RootNode, parse_tree, flatten_sxpr, serialize, ZOMBIE_TAG
from DHParser.toolkit import get_config_value, set_config_value, load_if_file, re
45

46

di68kap's avatar
di68kap committed
47
__all__ = ('unit_from_config',
48
           'unit_from_json',
di68kap's avatar
di68kap committed
49
           'TEST_READERS',
50
51
52
53
           'unit_from_file',
           'get_report',
           'grammar_unit',
           'grammar_suite',
eckhart's avatar
eckhart committed
54
55
56
           'SymbolsDictType',
           'extract_symbols',
           'create_test_templates',
57
           'reset_unit',
58
59
           'runner')

60
61
UNIT_STAGES = {'match*', 'match', 'fail', 'ast', 'cst'}
RESULT_STAGES = {'__cst__', '__ast__', '__err__'}
62

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# def unit_from_configfile(config_filename):
#     """
#     Reads a grammar unit test from a config file.
#     """
#     cfg = configparser.ConfigParser(interpolation=None)
#     cfg.read(config_filename, encoding="utf8")
#     OD = collections.OrderedDict
#     unit = OD()
#     for section in cfg.sections():
#         symbol, stage = section.split(':')
#         if stage not in UNIT_STAGES:
#             if symbol in UNIT_STAGES:
#                 symbol, stage = stage, symbol
#             else:
#                 raise ValueError('Test stage %s not in: ' % (stage, str(UNIT_STAGES)))
#         for testkey, testcode in cfg[section].items():
#             if testcode[:3] + testcode[-3:] in {"''''''", '""""""'}:
#                 testcode = testcode[3:-3]
#                 # testcode = testcode.replace('\\#', '#')
#                 testcode = re.sub(r'(?<!\\)\\#', '#', testcode).replace('\\\\', '\\')
#             elif testcode[:1] + testcode[-1:] in {"''", '""'}:
#                 testcode = testcode[1:-1]
#             unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
#     # print(json.dumps(unit, sort_keys=True, indent=4))
#     return unit

eckhart's avatar
eckhart committed
89
RX_SECTION = re.compile(r'\s*\[(?P<stage>\w+):(?P<symbol>\w+)\]')
90
RE_VALUE = '(?:"""((?:.|\n)*?)""")|' + "(?:'''((?:.|\n)*?)''')|" + \
eckhart's avatar
eckhart committed
91
           r'(?:"(.*?)")|' + "(?:'(.*?)')|" + r'(.*(?:\n(?:\s*\n)*    .*)*)'
92
93
# the following does not work with pypy3, because pypy's re-engine does not
# support local flags, e.g. '(?s: )'
eckhart's avatar
eckhart committed
94
95
96
97
# RE_VALUE = r'(?:"""((?s:.*?))""")|' + "(?:'''((?s:.*?))''')|" + \
#            r'(?:"(.*?)")|' + "(?:'(.*?)')|" + '(.*(?:\n(?:\s*\n)*    .*)*)'
RX_ENTRY = re.compile(r'\s*(\w+\*?)\s*:\s*(?:{value})\s*'.format(value=RE_VALUE))
RX_COMMENT = re.compile(r'\s*#.*\n')
98

99

di68kap's avatar
di68kap committed
100
def unit_from_config(config_str):
101
102
103
104
    """ Reads grammar unit tests contained in a file in config file (.ini)
    syntax.

    Args:
di68kap's avatar
di68kap committed
105
        config_str (str): A string containing a config-file with Grammar unit-tests
106
107
108
109

    Returns:
        A dictionary representing the unit tests.
    """
eckhart's avatar
eckhart committed
110
111
    # TODO: issue a warning if the same match:xxx or fail:xxx block appears more than once

112
113
114
115
116
117
118
    def eat_comments(txt, pos):
        m = RX_COMMENT.match(txt, pos)
        while m:
            pos = m.span()[1]
            m = RX_COMMENT.match(txt, pos)
        return pos

di68kap's avatar
di68kap committed
119
    cfg = config_str.replace('\t', '    ')
120

121
122
    OD = collections.OrderedDict
    unit = OD()
123
124
125
126
127
128

    pos = eat_comments(cfg, 0)
    section_match = RX_SECTION.match(cfg, pos)
    while section_match:
        d = section_match.groupdict()
        stage = d['stage']
129
        if stage not in UNIT_STAGES:
130
131
132
133
134
            raise KeyError('Unknown stage ' + stage + " ! must be one of: " + str(UNIT_STAGES))
        symbol = d['symbol']
        pos = eat_comments(cfg, section_match.span()[1])

        entry_match = RX_ENTRY.match(cfg, pos)
eckhart's avatar
eckhart committed
135
136
        # if entry_match is None:
        #     SyntaxError('No entries in section [%s:%s]' % (stage, symbol))
137
138
139
140
141
142
143
144
145
146
        while entry_match:
            testkey, testcode = [group for group in entry_match.groups() if group is not None]
            lines = testcode.split('\n')
            if len(lines) > 1:
                indent = sys.maxsize
                for line in lines[1:]:
                    indent = min(indent, len(line) - len(line.lstrip()))
                for i in range(1, len(lines)):
                    lines[i] = lines[i][indent:]
                testcode = '\n'.join(lines)
147
148
149
150
            # unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
            test = unit.setdefault(symbol, OD()).setdefault(stage, OD())
            assert testkey not in test, "Key %s already exists in text %s:%s !" % (testkey, stage, symbol)
            test[testkey] = testcode
151
152
153
154
155
            pos = eat_comments(cfg, entry_match.span()[1])
            entry_match = RX_ENTRY.match(cfg, pos)

        section_match = RX_SECTION.match(cfg, pos)

156
    if pos != len(cfg) and not re.match(r'\s+$', cfg[pos:]):
157
        raise SyntaxError('in line %i' % (cfg[:pos].count('\n') + 2))  # TODO: Add file name
158

159
    return unit
160

161

di68kap's avatar
di68kap committed
162
def unit_from_json(json_str):
163
    """
di68kap's avatar
di68kap committed
164
    Reads grammar unit tests from a json string.
165
    """
di68kap's avatar
di68kap committed
166
    unit = json.loads(json_str)
167
168
169
    for symbol in unit:
        for stage in unit[symbol]:
            if stage not in UNIT_STAGES:
170
                raise ValueError('Test stage %s not in: %s' % (stage, str(UNIT_STAGES)))
171
172
    return unit

di68kap's avatar
di68kap committed
173

174
# TODO: add support for yaml, cson, toml
175
176


di68kap's avatar
di68kap committed
177
178
179
180
181
182
183
184
185
# A dictionary associating file endings with reader functions that
# transfrom strings containing the file's content to a nested dictionary
# structure of test cases.
TEST_READERS = {
    '.ini': unit_from_config,
    '.json': unit_from_json
}


186
def unit_from_file(filename):
187
188
    """
    Reads a grammar unit test from a file. The format of the file is
189
190
    determined by the ending of its name.
    """
di68kap's avatar
di68kap committed
191
192
193
194
195
196
    try:
        reader = TEST_READERS[os.path.splitext(filename)[1].lower()]
        with open(filename, 'r', encoding='utf8') as f:
            data = f.read()
        test_unit = reader(data)
    except KeyError:
197
        raise ValueError("Unknown unit test file type: " + filename[filename.rfind('.'):])
198

di68kap's avatar
di68kap committed
199
200
201
    # Check for ambiguous Test names
    errors = []
    for parser_name, tests in test_unit.items():
di68kap's avatar
di68kap committed
202
203
204
205
206
207
208
209
        # normalize case for test category names
        keys = list(tests.keys())
        for key in keys:
            new_key = key.lower()
            if new_key != key:
                tests[new_key] = tests[keys]
                del tests[keys]

di68kap's avatar
di68kap committed
210
211
        m_names = set(tests.get('match', dict()).keys())
        f_names = set(tests.get('fail', dict()).keys())
212
213
        intersection = list(m_names & f_names)
        intersection.sort()
di68kap's avatar
di68kap committed
214
215
        if intersection:
            errors.append("Same names %s assigned to match and fail test "
216
217
                          "of parser %s." % (str(intersection), parser_name) +
                          " Please, use different names!")
di68kap's avatar
di68kap committed
218
219
220
221
222
223
    if errors:
        raise EnvironmentError("Error(s) in Testfile %s :\n" % filename
                               + '\n'.join(errors))

    return test_unit

224

di68kap's avatar
di68kap committed
225
226
227
228
229
230
# def all_match_tests(tests):
#     """Returns all match tests from ``tests``, This includes match tests
#     marked with an asterix for CST-output as well as unmarked match-tests.
#     """
#     return itertools.chain(tests.get('match', dict()).items(),
#                            tests.get('match*', dict()).items())
231
232


233
def get_report(test_unit):
234
    """
235
236
237
238
    Returns a text-report of the results of a grammar unit test. The report
    lists the source of all tests as well as the error messages, if a test
    failed or the abstract-syntax-tree (AST) in case of success.

239
240
    If an asterix has been appended to the test name then the concrete syntax
    tree will also be added to the report in this particular case.
241
242
243
244

    The purpose of the latter is to help constructing and debugging
    of AST-Transformations. It is better to switch the CST-output on and off
    with the asterix marker when needed than to output the CST for all tests
245
    which would unnecessarily bloat the test reports.
246
    """
247
248
249
250
    def indent(txt):
        lines = txt.split('\n')
        lines[0] = '    ' + lines[0]
        return "\n    ".join(lines)
251

252
253
254
255
    report = []
    for parser_name, tests in test_unit.items():
        heading = 'Test of parser: "%s"' % parser_name
        report.append('\n\n%s\n%s\n' % (heading, '=' * len(heading)))
256
        for test_name, test_code in tests.get('match', dict()).items():
257
258
259
            heading = 'Match-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
260
            report.append(indent(test_code))
261
262
263
264
265
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
            ast = tests.get('__ast__', {}).get(test_name, None)
266
            cst = tests.get('__cst__', {}).get(test_name, None)
267
            if cst and (not ast or str(test_name).endswith('*')):
268
                report.append('\n### CST')
269
                report.append(indent(serialize(cst, 'cst')))
270
            if ast:
271
                report.append('\n### AST')
272
                report.append(indent(serialize(ast, 'ast')))
di68kap's avatar
di68kap committed
273
274
275
276
277
        for test_name, test_code in tests.get('fail', dict()).items():
            heading = 'Fail-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
            report.append(indent(test_code))
278
279
280
281
            messages = tests.get('__msg__', {}).get(test_name, "")
            if messages:
                report.append('\n### Messages:')
                report.append(messages)
di68kap's avatar
di68kap committed
282
283
284
285
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
286
287
288
    return '\n'.join(report)


289
def grammar_unit(test_unit, parser_factory, transformer_factory, report=True, verbose=False):
290
291
    """
    Unit tests for a grammar-parser and ast transformations.
292
    """
293
294
295
296
297
298
299
300
301
    output = []

    def write(s):
        nonlocal output
        """Append string `s` to output. The purpose is to defer printing to
        stdout in order to avoid muddled output when several unit tests run
        at the same time."""
        output.append(s)

di68kap's avatar
di68kap committed
302
303
304
305
306
307
    def clean_key(k):
        try:
            return k.replace('*', '')
        except AttributeError:
            return k

eckhart's avatar
eckhart committed
308
    def get(tests, category, key) -> str:
di68kap's avatar
di68kap committed
309
310
311
312
        try:
            value = tests[category][key] if key in tests[category] \
                else tests[category][clean_key(key)]
        except KeyError:
eckhart's avatar
eckhart committed
313
314
315
            return ''
            # raise AssertionError('%s-test %s for parser %s missing !?'
            #                      % (category, test_name, parser_name))
di68kap's avatar
di68kap committed
316
317
        return value

318
    if isinstance(test_unit, str):
319
        _, unit_name = os.path.split(os.path.splitext(test_unit)[0])
320
        test_unit = unit_from_file(test_unit)
321
    else:
322
        unit_name = 'unit_test_' + str(id(test_unit))
323
    if verbose:
324
        write("\nGRAMMAR TEST UNIT: " + unit_name)
325
326
327
    errata = []
    parser = parser_factory()
    transform = transformer_factory()
328

329
    def has_lookahead(parser_name: str) -> bool:
330
        """Returns True if the parser or any of its descendant parsers is a
331
332
333
334
335
336
337
338
339
340
        Lookahead parser."""
        lookahead_found = False

        def find_lookahead(p: Parser):
            nonlocal lookahead_found
            if not lookahead_found:
                lookahead_found = isinstance(p, Lookahead)

        parser[parser_name].apply(find_lookahead)
        return lookahead_found
341

342
    def lookahead_artifact(st):
di68kap's avatar
di68kap committed
343
        """
344
        Returns True, if the error merely occurred, because the parser
eckhart's avatar
eckhart committed
345
        stopped in front of a sequence that was captured by a lookahead
346
347
348
        operator or if a mandatory lookahead failed at the end of data.
        This is required for testing of parsers that put a lookahead
        operator at the end. See test_testing.TestLookahead.
di68kap's avatar
di68kap committed
349
        """
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
        raw_errors = st.errors_sorted
        is_artifact = ((len(raw_errors) == 2  # case 1:  superfluous data for lookahead
                        and raw_errors[-1].code == Error.PARSER_LOOKAHEAD_MATCH_ONLY
                        and raw_errors[-2].code == Error.PARSER_STOPPED_BEFORE_END)
                       #  case 2:  mandatory lookahead failure at end of text
                       or (len(raw_errors) == 1
                           and raw_errors[-1].code == Error.MANDATORY_CONTINUATION_AT_EOF))
        if is_artifact:
            # remove zombie node with error message at the end
            for parent in st.select(lambda node: any(child.tag_name == ZOMBIE_TAG
                                                     for child in node.children),
                                     include_root=True, reverse=True):
                parent.result = tuple(c for c in parent.children if c.tag_name != ZOMBIE_TAG)
                break
        return is_artifact
di68kap's avatar
di68kap committed
365

366
    for parser_name, tests in test_unit.items():
367
368
369
        if not get_config_value('test_parallelization'):
            print('  ' + parser_name)

370
        assert parser_name, "Missing parser name in test %s!" % unit_name
eckhart's avatar
eckhart committed
371
        assert not any(test_type in RESULT_STAGES for test_type in tests), \
372
373
374
375
376
            ("Test %s in %s already has results. Use reset_unit() before running again!"
             % (parser_name, unit_name))
        assert set(tests.keys()).issubset(UNIT_STAGES), \
            'Unknown test-types: %s ! Must be one of %s' \
            % (set(tests.keys()) - UNIT_STAGES, UNIT_STAGES)
377
        if verbose:
378
            write('  Match-Tests for parser "' + parser_name + '"')
379
        match_tests = set(tests['match'].keys()) if 'match' in tests else set()
380
381
        if 'ast' in tests:
            ast_tests = set(tests['ast'].keys())
di68kap's avatar
di68kap committed
382
383
384
            if not {clean_key(k) for k in ast_tests} <= {clean_key(k) for k in match_tests}:
                raise AssertionError('AST-Tests %s for parser %s lack corresponding match-tests!'
                                     % (str(ast_tests - match_tests), parser_name))
385
386
        if 'cst' in tests:
            cst_tests = set(tests['cst'].keys())
di68kap's avatar
di68kap committed
387
            if not {clean_key(k) for k in cst_tests} <= {clean_key(k) for k in match_tests}:
388
389
                raise AssertionError('CST-Tests %s lack corresponding match-tests!'
                                     % str(cst_tests - match_tests))
390
391
392

        # run match tests

393
        for test_name, test_code in tests.get('match', dict()).items():
394
395
396
            if not get_config_value('test_parallelization'):
                print('    ' + test_name)

eckhart's avatar
eckhart committed
397
            errflag = len(errata)
398
            try:
399
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
400
            except UnknownParserError as upe:
401
                cst = RootNode()
Eckhart Arnold's avatar
Eckhart Arnold committed
402
                cst = cst.new_error(Node(ZOMBIE_TAG, "").with_pos(0), str(upe))
eckhart's avatar
eckhart committed
403
            clean_test_name = str(test_name).replace('*', '')
eckhart's avatar
eckhart committed
404
            # log_ST(cst, "match_%s_%s.cst" % (parser_name, clean_test_name))
405
            tests.setdefault('__cst__', {})[test_name] = cst
406
407
            if is_error(cst.error_flag) and not lookahead_artifact(cst):
                errors = adjust_error_locations(cst.errors_sorted, test_code)
Eckhart Arnold's avatar
Eckhart Arnold committed
408
                errata.append('Match test "%s" for parser "%s" failed:\n\tExpr.:  %s\n\n\t%s\n\n' %
409
                              (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
410
                               '\n\t'.join(str(m).replace('\n', '\n\t\t') for m in errors)))
di68kap's avatar
di68kap committed
411
                # tests.setdefault('__err__', {})[test_name] = errata[-1]
412
                # write parsing-history log only in case of failure!
413
                if is_logging():
di68kap's avatar
di68kap committed
414
                    log_parsing_history(parser, "match_%s_%s.log" % (parser_name, clean_test_name))
415
416
417
418
419
            if "ast" in tests or report:
                ast = copy.deepcopy(cst)
                transform(ast)
                tests.setdefault('__ast__', {})[test_name] = ast
                # log_ST(ast, "match_%s_%s.ast" % (parser_name, clean_test_name))
420
            if verbose:
eckhart's avatar
eckhart committed
421
                infostr = '    match-test "' + test_name + '" ... '
422
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
423

eckhart's avatar
eckhart committed
424
425
426
            if "cst" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "cst", test_name))
                if compare:
427
                    if not compare.equals(cst):
eckhart's avatar
eckhart committed
428
                        errata.append('Concrete syntax tree test "%s" for parser "%s" failed:\n%s' %
429
                                      (test_name, parser_name, serialize(cst, 'cst')))
eckhart's avatar
eckhart committed
430
431
432
433
434
435
436
                    if verbose:
                        infostr = '      cst-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if "ast" in tests and len(errata) == errflag:
                compare = parse_tree(get(tests, "ast", test_name))
                if compare:
437
                    if not compare.equals(ast):
eckhart's avatar
eckhart committed
438
439
440
441
442
443
444
445
446
447
448
449
                        errata.append('Abstract syntax tree test "%s" for parser "%s" failed:'
                                      '\n\tExpr.:     %s\n\tExpected:  %s\n\tReceived:  %s'
                                      % (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
                                         flatten_sxpr(compare.as_sxpr()),
                                         flatten_sxpr(ast.as_sxpr())))
                    if verbose:
                        infostr = '      ast-test "' + test_name + '" ... '
                        write(infostr + ("OK" if len(errata) == errflag else "FAIL"))

            if len(errata) > errflag:
                tests.setdefault('__err__', {})[test_name] = errata[-1]

450
        if verbose and 'fail' in tests:
451
            write('  Fail-Tests for parser "' + parser_name + '"')
452
453
454

        # run fail tests

455
        for test_name, test_code in tests.get('fail', dict()).items():
eckhart's avatar
eckhart committed
456
            errflag = len(errata)
457
458
            # cst = parser(test_code, parser_name)
            try:
459
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
460
            except UnknownParserError as upe:
Eckhart Arnold's avatar
Eckhart Arnold committed
461
                node = Node(ZOMBIE_TAG, "").with_pos(0)
eckhart's avatar
eckhart committed
462
                cst = RootNode(node).new_error(node, str(upe))
463
                errata.append('Unknown parser "{}" in fail test "{}"!'.format(parser_name, test_name))
464
                tests.setdefault('__err__', {})[test_name] = errata[-1]
465
            if not is_error(cst.error_flag) and not lookahead_artifact(cst):
466
467
                errata.append('Fail test "%s" for parser "%s" yields match instead of '
                              'expected failure!' % (test_name, parser_name))
468
                tests.setdefault('__err__', {})[test_name] = errata[-1]
469
                # write parsing-history log only in case of test-failure
470
                if is_logging():
471
                    log_parsing_history(parser, "fail_%s_%s.log" % (parser_name, test_name))
472
473
            if cst.error_flag:
                tests.setdefault('__msg__', {})[test_name] = \
474
                    "\n".join(str(e) for e in cst.errors_sorted)
475
            if verbose:
eckhart's avatar
eckhart committed
476
                infostr = '    fail-test  "' + test_name + '" ... '
477
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
478

479
480
    # write test-report
    if report:
481
        report_dir = "REPORT"
eckhart's avatar
eckhart committed
482
483
484
485
486
487
        test_report = get_report(test_unit)
        if test_report:
            if not os.path.exists(report_dir):
                os.mkdir(report_dir)
            with open(os.path.join(report_dir, unit_name + '.md'), 'w', encoding='utf8') as f:
                f.write(test_report)
488

489
    print('\n'.join(output))
490
491
492
    return errata


493
def reset_unit(test_unit):
eckhart's avatar
eckhart committed
494
495
496
497
    """
    Resets the tests in ``test_unit`` by removing all results and error
    messages.
    """
498
499
500
501
502
503
504
505
    for parser, tests in test_unit.items():
        for key in list(tests.keys()):
            if key not in UNIT_STAGES:
                if key not in RESULT_STAGES:
                    print('Removing unknown component %s from test %s' % (key, parser))
                del tests[key]


Eckhart Arnold's avatar
Eckhart Arnold committed
506
507
508
509
def grammar_suite(directory, parser_factory, transformer_factory,
                  fn_patterns=['*test*'],
                  ignore_unknown_filetypes=False,
                  report=True, verbose=True):
510
511
    """
    Runs all grammar unit tests in a directory. A file is considered a test
512
513
    unit, if it has the word "test" in its name.
    """
514
    if not isinstance(fn_patterns, collections.abc.Iterable):
Eckhart Arnold's avatar
Eckhart Arnold committed
515
        fn_patterns = [fn_patterns]
516
    all_errors = collections.OrderedDict()
517
518
    if verbose:
        print("\nScanning test-directory: " + directory)
519
520
    save_cwd = os.getcwd()
    os.chdir(directory)
eckhart's avatar
eckhart committed
521
522
    if is_logging():
        clear_logs()
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542

    if get_config_value('test_parallelization'):
        with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
            results = []
            for filename in sorted(os.listdir('.')):
                if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                    parameters = filename, parser_factory, transformer_factory, report, verbose
                    results.append((filename, pool.submit(grammar_unit, *parameters)))
                    # grammar_unit(*parameters)
            for filename, err_future in results:
                try:
                    errata = err_future.result()
                    if errata:
                        all_errors[filename] = errata
                except ValueError as e:
                    if not ignore_unknown_filetypes or str(e).find("Unknown") < 0:
                        raise e
    else:
        results = []
        for filename in sorted(os.listdir('.')):
543
544
            if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                parameters = filename, parser_factory, transformer_factory, report, verbose
545
                print(filename)
546
547
548
549
550
                results.append((filename, grammar_unit(*parameters)))
        for filename, errata in results:
            if errata:
                all_errors[filename] = errata

551
    os.chdir(save_cwd)
eckhart's avatar
eckhart committed
552
553
    error_report = []
    err_N = 0
554
555
    if all_errors:
        for filename in all_errors:
di68kap's avatar
di68kap committed
556
            error_report.append('Errors found by unit test "%s":\n' % filename)
di68kap's avatar
di68kap committed
557
            err_N += len(all_errors[filename])
558
559
560
            for error in all_errors[filename]:
                error_report.append('\t' + '\n\t'.join(error.split('\n')))
    if error_report:
di68kap's avatar
di68kap committed
561
562
563
564
        # if verbose:
        #     print("\nFAILURE! %i error%s found!\n" % (err_N, 's' if err_N > 1 else ''))
        return ('Test suite "%s" revealed %s error%s:\n\n'
                % (directory, err_N, 's' if err_N > 1 else '') + '\n'.join(error_report))
eckhart's avatar
eckhart committed
565
566
    if verbose:
        print("\nSUCCESS! All tests passed :-)\n")
567
568
569
    return ''


eckhart's avatar
eckhart committed
570
571
572
573
574
575
576
########################################################################
#
# Support for unit-testing of ebnf-grammars
#
########################################################################


577
RX_DEFINITION_OR_SECTION = re.compile(r'(?:^|\n)[ \t]*(\w+(?=[ \t]*=)|#:.*(?=\n|$|#))')
eckhart's avatar
eckhart committed
578
579
580
581
SymbolsDictType = Dict[str, List[str]]


def extract_symbols(ebnf_text_or_file: str) -> SymbolsDictType:
582
    r"""
eckhart's avatar
eckhart committed
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
    Extracts all defined symbols from an EBNF-grammar. This can be used to
    prepare grammar-tests. The symbols will be returned as lists of strings
    which are grouped by the sections to which they belong and returned as
    an ordered dictionary, they keys of which are the section names.
    In order to define a section in the ebnf-source, add a comment-line
    starting with "#:", followed by the section name. It is recommended
    to use valid file names as section names. Example:

        #: components

        expression = term  { EXPR_OP~ term}
        term       = factor  { TERM_OP~ factor}
        factor     = [SIGN] ( NUMBER | VARIABLE | group ) { VARIABLE | group }
        group      = "(" expression ")"


        #: leaf_expressions

        EXPR_OP    = /\+/ | /-/
        TERM_OP    = /\*/ | /\//
        SIGN       = /-/

        NUMBER     = /(?:0|(?:[1-9]\d*))(?:\.\d+)?/~
        VARIABLE   = /[A-Za-z]/~

    If no sections have been defined in the comments, there will be only
    one group with the empty string as a key.

    :param ebnf_text_or_file: Either an ebnf-grammar or the file-name
            of an ebnf-grammar
    :return: Ordered dictionary mapping the section names of the grammar
            to lists of symbols that appear under that section.
    """
    def trim_section_name(name: str) -> str:
617
        return re.sub(r'[^\w-]', '_', name.replace('#:', '').strip())
eckhart's avatar
eckhart committed
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679

    ebnf = load_if_file(ebnf_text_or_file)
    deflist = RX_DEFINITION_OR_SECTION.findall(ebnf)
    if not deflist:
        raise AssertionError('No symbols found in: ' + ebnf_text_or_file[:40])
    symbols = collections.OrderedDict()  # type: SymbolsDictType
    if deflist[0][:2] != '#:':
        curr_section = ''
        symbols[curr_section] = []
    for df in deflist:
        if df[:2] == '#:':
            curr_section = trim_section_name(df)
            if curr_section in symbols:
                raise AssertionError('Section name must not be repeated: ' + curr_section)
            symbols[curr_section] = []
        else:
            symbols[curr_section].append(df)
    return symbols


def create_test_templates(symbols_or_ebnf: Union[str, SymbolsDictType],
                          path: str,
                          fmt: str = '.ini') -> None:
    """
    Creates template files for grammar unit-tests for the given symbols .

    Args:
        symbols_or_ebnf: Either a dictionary that matches section names to
                the grammar's symbols under that section or an EBNF-grammar
                or file name of an EBNF-grammar from which the symbols shall
                be extracted.
        path: the path to the grammar-test directory (usually 'grammar_tests').
                If the last element of the path does not exist, the directory
                will be created.
        fmt: the test-file-format. At the moment only '.ini' is supported
    """
    assert fmt == '.ini'
    if isinstance(symbols_or_ebnf, str):
        symbols = extract_symbols(cast(str, symbols_or_ebnf))  # type: SymbolsDictType
    else:
        symbols = cast(Dict, symbols_or_ebnf)
    if not os.path.exists(path):
        os.mkdir(path)
    if os.path.isdir(path):
        save = os.getcwd()
        os.chdir(path)
        keys = reversed(list(symbols.keys()))
        for i, k in enumerate(keys):
            filename = '{num:0>2}_test_{section}'.format(num=i+1, section=k) + fmt
            if os.path.exists(filename):
                print('File "{name}" not created, because it already exists!')
            else:
                with open(filename, 'w', encoding='utf-8') as f:
                    for sym in symbols[k]:
                        f.write('\n[match:{sym}]\n\n'.format(sym=sym))
                        f.write('[ast:{sym}]\n\n'.format(sym=sym))
                        f.write('[fail:{sym}]\n\n'.format(sym=sym))
        os.chdir(save)
    else:
        raise ValueError(path + ' is not a directory!')


680
681
682
683
684
685
#######################################################################
#
#  general unit testing support
#
#######################################################################

686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715

def run_tests_in_class(test, namespace):
    """
    Runs all tests in test-class `test` in the given namespace.
    """
    def instantiate(cls_name, namespace):
        exec("obj = " + cls_name + "()", namespace)
        obj = namespace["obj"]
        if "setup" in dir(obj):
            obj.setup()
        return obj

    obj = None
    try:
        if test.find('.') >= 0:
            cls_name, method_name = test.split('.')
            obj = instantiate(cls_name, namespace)
            print("Running " + cls_name + "." + method_name)
            exec('obj.' + method_name + '()')
        else:
            obj = instantiate(test, namespace)
            for name in dir(obj):
                if name.lower().startswith("test"):
                    print("Running " + test + "." + name)
                    exec('obj.' + name + '()')
    finally:
        if "teardown" in dir(obj):
            obj.teardown()


716
def run_test_function(func_name, namespace):
717
718
719
    """
    Run the test-function `test` in the given namespace.
    """
720
721
    print("Running test-function: " + func_name)
    exec(func_name + '()', namespace)
722
723


eckhart's avatar
eckhart committed
724
def runner(tests, namespace):
725
726
    """
    Runs all or some selected Python unit tests found in the
eckhart's avatar
eckhart committed
727
    namespace. To run all tests in a module, call
728
    ``runner("", globals())`` from within that module.
729

730
731
732
733
    Unit-Tests are either classes, the name of which starts with
    "Test" and methods, the name of which starts with "test" contained
    in such classes or functions, the name of which starts with "test".

734
    Args:
eckhart's avatar
eckhart committed
735
736
737
738
739
740
        tests: String or list of strings with the names of tests to
            run. If empty, runner searches by itself all objects the
            of which starts with 'test' and runs it (if its a function)
            or all of its methods that start with "test" if its a class
            plus the "setup" and "teardown" methods if they exist.

eckhart's avatar
eckhart committed
741
        namespace: The namespace for running the test, usually
742
            ``globals()`` should be used.
eckhart's avatar
eckhart committed
743

744
745
746
747
748
749
750
751
    Example:
        class TestSomething()
            def setup(self):
                pass
            def teardown(self):
                pass
            def test_something(self):
                pass
eckhart's avatar
eckhart committed
752

753
        if __name__ == "__main__":
di68kap's avatar
di68kap committed
754
            from DHParser.testing import runner
eckhart's avatar
eckhart committed
755
            runner("", globals())
756
    """
eckhart's avatar
eckhart committed
757
758
    test_classes = []
    test_functions = []
759

eckhart's avatar
eckhart committed
760
761
762
763
    if tests:
        if isinstance(tests, str):
            tests = tests.split(' ')
        assert all(test.lower().startswith('test') for test in tests)
764
    else:
eckhart's avatar
eckhart committed
765
766
767
768
769
770
771
772
        tests = namespace.keys()

    for name in tests:
        if name.lower().startswith('test'):
            if inspect.isclass(namespace[name]):
                test_classes.append(name)
            elif inspect.isfunction(namespace[name]):
                test_functions.append(name)
773
774

    for test in test_classes:
775
        run_tests_in_class(test, namespace)
776
777

    for test in test_functions:
778
779
780
        run_test_function(test, namespace)


781
782
783
784
785
786
787
788
789
790
791
792
793
def run_file(fname):
    if fname.lower().startswith('test_') and fname.endswith('.py'):
        # print('\nRUNNING UNIT TESTS IN: ' + fname)
        exec('import ' + fname[:-3])
        runner('', eval(fname[:-3]).__dict__)


def run_path(path):
    """Runs all unit tests in `path`"""
    if os.path.isdir(path):
        sys.path.append(path)
        files = os.listdir(path)
        result_futures = []
794
795
796
797
798
799
800
801
802
803
804
805

        if get_config_value('test_parallelization'):
            with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
                for f in files:
                    result_futures.append(pool.submit(run_file, f))
                    # run_file(f)  # for testing!
                for r in result_futures:
                    try:
                        _ = r.result()
                    except AssertionError as failure:
                        print(failure)
        else:
806
            for f in files:
807
808
                run_file(f)

809
810
811
812
813
814
    else:
        path, fname = os.path.split(path)
        sys.path.append(path)
        run_file(fname)
    sys.path.pop()