testing.py 26.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# testing.py - test support for DHParser based grammars and compilers
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
17

18
19
20
21
22
23
24
25
26
"""
Module ``testing`` contains support for unit-testing domain specific
languages. Tests for arbitrarily small components of the Grammar can
be written into test files with ini-file syntax in order to test
whether the parser matches or fails as expected. It can also be
tested whether it produces an expected concrete or abstract syntax tree.
Usually, however, unexpected failure to match a certain string is the
main cause of trouble when constructing a context free Grammar.
"""
27
28


29
import collections
30
import concurrent.futures
31
# import configparser
32
import copy
Eckhart Arnold's avatar
Eckhart Arnold committed
33
import fnmatch
di68kap's avatar
di68kap committed
34
import inspect
35
import json
36
import multiprocessing
37
import os
38
import sys
39

di68kap's avatar
di68kap committed
40
from DHParser.error import Error, is_error, adjust_error_locations
41
42
from DHParser.log import is_logging, clear_logs, log_parsing_history
from DHParser.parse import UnknownParserError, Parser, Lookahead
43
from DHParser.syntaxtree import Node, RootNode, parse_sxpr, flatten_sxpr, ZOMBIE_TAG
44
45
46
from DHParser.toolkit import re, typing

from typing import Tuple
47

di68kap's avatar
di68kap committed
48
__all__ = ('unit_from_config',
49
           'unit_from_json',
di68kap's avatar
di68kap committed
50
           'TEST_READERS',
51
52
53
54
           'unit_from_file',
           'get_report',
           'grammar_unit',
           'grammar_suite',
55
           'reset_unit',
56
57
           'runner')

58
59
UNIT_STAGES = {'match*', 'match', 'fail', 'ast', 'cst'}
RESULT_STAGES = {'__cst__', '__ast__', '__err__'}
60

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# def unit_from_configfile(config_filename):
#     """
#     Reads a grammar unit test from a config file.
#     """
#     cfg = configparser.ConfigParser(interpolation=None)
#     cfg.read(config_filename, encoding="utf8")
#     OD = collections.OrderedDict
#     unit = OD()
#     for section in cfg.sections():
#         symbol, stage = section.split(':')
#         if stage not in UNIT_STAGES:
#             if symbol in UNIT_STAGES:
#                 symbol, stage = stage, symbol
#             else:
#                 raise ValueError('Test stage %s not in: ' % (stage, str(UNIT_STAGES)))
#         for testkey, testcode in cfg[section].items():
#             if testcode[:3] + testcode[-3:] in {"''''''", '""""""'}:
#                 testcode = testcode[3:-3]
#                 # testcode = testcode.replace('\\#', '#')
#                 testcode = re.sub(r'(?<!\\)\\#', '#', testcode).replace('\\\\', '\\')
#             elif testcode[:1] + testcode[-1:] in {"''", '""'}:
#                 testcode = testcode[1:-1]
#             unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
#     # print(json.dumps(unit, sort_keys=True, indent=4))
#     return unit

eckhart's avatar
eckhart committed
87
RX_SECTION = re.compile(r'\s*\[(?P<stage>\w+):(?P<symbol>\w+)\]')
88
RE_VALUE = '(?:"""((?:.|\n)*?)""")|' + "(?:'''((?:.|\n)*?)''')|" + \
eckhart's avatar
eckhart committed
89
           r'(?:"(.*?)")|' + "(?:'(.*?)')|" + r'(.*(?:\n(?:\s*\n)*    .*)*)'
90
91
# the following does not work with pypy3, because pypy's re-engine does not
# support local flags, e.g. '(?s: )'
eckhart's avatar
eckhart committed
92
93
94
95
# RE_VALUE = r'(?:"""((?s:.*?))""")|' + "(?:'''((?s:.*?))''')|" + \
#            r'(?:"(.*?)")|' + "(?:'(.*?)')|" + '(.*(?:\n(?:\s*\n)*    .*)*)'
RX_ENTRY = re.compile(r'\s*(\w+\*?)\s*:\s*(?:{value})\s*'.format(value=RE_VALUE))
RX_COMMENT = re.compile(r'\s*#.*\n')
96

97

di68kap's avatar
di68kap committed
98
def unit_from_config(config_str):
99
100
101
102
    """ Reads grammar unit tests contained in a file in config file (.ini)
    syntax.

    Args:
di68kap's avatar
di68kap committed
103
        config_str (str): A string containing a config-file with Grammar unit-tests
104
105
106
107

    Returns:
        A dictionary representing the unit tests.
    """
eckhart's avatar
eckhart committed
108
109
    # TODO: issue a warning if the same match:xxx or fail:xxx block appears more than once

110
111
112
113
114
115
116
    def eat_comments(txt, pos):
        m = RX_COMMENT.match(txt, pos)
        while m:
            pos = m.span()[1]
            m = RX_COMMENT.match(txt, pos)
        return pos

di68kap's avatar
di68kap committed
117
    cfg = config_str.replace('\t', '    ')
118

119
120
    OD = collections.OrderedDict
    unit = OD()
121
122
123
124
125
126

    pos = eat_comments(cfg, 0)
    section_match = RX_SECTION.match(cfg, pos)
    while section_match:
        d = section_match.groupdict()
        stage = d['stage']
127
        if stage not in UNIT_STAGES:
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
            raise KeyError('Unknown stage ' + stage + " ! must be one of: " + str(UNIT_STAGES))
        symbol = d['symbol']
        pos = eat_comments(cfg, section_match.span()[1])

        entry_match = RX_ENTRY.match(cfg, pos)
        if entry_match is None:
            raise SyntaxError('No entries in section [%s:%s]' % (stage, symbol))
        while entry_match:
            testkey, testcode = [group for group in entry_match.groups() if group is not None]
            lines = testcode.split('\n')
            if len(lines) > 1:
                indent = sys.maxsize
                for line in lines[1:]:
                    indent = min(indent, len(line) - len(line.lstrip()))
                for i in range(1, len(lines)):
                    lines[i] = lines[i][indent:]
                testcode = '\n'.join(lines)
145
            unit.setdefault(symbol, OD()).setdefault(stage, OD())[testkey] = testcode
146
147
148
149
150
151
            pos = eat_comments(cfg, entry_match.span()[1])
            entry_match = RX_ENTRY.match(cfg, pos)

        section_match = RX_SECTION.match(cfg, pos)

    if pos != len(cfg):
152
        raise SyntaxError('in line %i' % (cfg[:pos].count('\n') + 1))
153

154
    return unit
155

156

di68kap's avatar
di68kap committed
157
def unit_from_json(json_str):
158
    """
di68kap's avatar
di68kap committed
159
    Reads grammar unit tests from a json string.
160
    """
di68kap's avatar
di68kap committed
161
    unit = json.loads(json_str)
162
163
164
    for symbol in unit:
        for stage in unit[symbol]:
            if stage not in UNIT_STAGES:
165
                raise ValueError('Test stage %s not in: %s' % (stage, str(UNIT_STAGES)))
166
167
    return unit

di68kap's avatar
di68kap committed
168

169
# TODO: add support for yaml, cson, toml
170
171


di68kap's avatar
di68kap committed
172
173
174
175
176
177
178
179
180
# A dictionary associating file endings with reader functions that
# transfrom strings containing the file's content to a nested dictionary
# structure of test cases.
TEST_READERS = {
    '.ini': unit_from_config,
    '.json': unit_from_json
}


181
def unit_from_file(filename):
182
183
    """
    Reads a grammar unit test from a file. The format of the file is
184
185
    determined by the ending of its name.
    """
di68kap's avatar
di68kap committed
186
187
188
189
190
191
    try:
        reader = TEST_READERS[os.path.splitext(filename)[1].lower()]
        with open(filename, 'r', encoding='utf8') as f:
            data = f.read()
        test_unit = reader(data)
    except KeyError:
192
        raise ValueError("Unknown unit test file type: " + filename[filename.rfind('.'):])
193

di68kap's avatar
di68kap committed
194
195
196
    # Check for ambiguous Test names
    errors = []
    for parser_name, tests in test_unit.items():
di68kap's avatar
di68kap committed
197
198
199
200
201
202
203
204
        # normalize case for test category names
        keys = list(tests.keys())
        for key in keys:
            new_key = key.lower()
            if new_key != key:
                tests[new_key] = tests[keys]
                del tests[keys]

di68kap's avatar
di68kap committed
205
206
        m_names = set(tests.get('match', dict()).keys())
        f_names = set(tests.get('fail', dict()).keys())
207
208
        intersection = list(m_names & f_names)
        intersection.sort()
di68kap's avatar
di68kap committed
209
210
211
212
213
214
215
216
217
        if intersection:
            errors.append("Same names %s assigned to match and fail test "
                          "of parser %s." % (str(intersection), parser_name))
    if errors:
        raise EnvironmentError("Error(s) in Testfile %s :\n" % filename
                               + '\n'.join(errors))

    return test_unit

218

di68kap's avatar
di68kap committed
219
220
221
222
223
224
# def all_match_tests(tests):
#     """Returns all match tests from ``tests``, This includes match tests
#     marked with an asterix for CST-output as well as unmarked match-tests.
#     """
#     return itertools.chain(tests.get('match', dict()).items(),
#                            tests.get('match*', dict()).items())
225
226


227
def get_report(test_unit):
228
    """
229
230
231
232
    Returns a text-report of the results of a grammar unit test. The report
    lists the source of all tests as well as the error messages, if a test
    failed or the abstract-syntax-tree (AST) in case of success.

233
234
    If an asterix has been appended to the test name then the concrete syntax
    tree will also be added to the report in this particular case.
235
236
237
238

    The purpose of the latter is to help constructing and debugging
    of AST-Transformations. It is better to switch the CST-output on and off
    with the asterix marker when needed than to output the CST for all tests
239
    which would unnecessarily bloat the test reports.
240
    """
241
242
243
244
    def indent(txt):
        lines = txt.split('\n')
        lines[0] = '    ' + lines[0]
        return "\n    ".join(lines)
245
246
247
248
    report = []
    for parser_name, tests in test_unit.items():
        heading = 'Test of parser: "%s"' % parser_name
        report.append('\n\n%s\n%s\n' % (heading, '=' * len(heading)))
249
        for test_name, test_code in tests.get('match', dict()).items():
250
251
252
            heading = 'Match-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
253
            report.append(indent(test_code))
254
255
256
257
258
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
            ast = tests.get('__ast__', {}).get(test_name, None)
259
            cst = tests.get('__cst__', {}).get(test_name, None)
260
            if cst and (not ast or str(test_name).endswith('*')):
261
                report.append('\n### CST')
eckhart's avatar
eckhart committed
262
                report.append(indent(cst.as_sxpr(compact=True)))
263
            if ast:
264
                report.append('\n### AST')
di68kap's avatar
di68kap committed
265
                report.append(indent(ast.as_xml()))
di68kap's avatar
di68kap committed
266
267
268
269
270
        for test_name, test_code in tests.get('fail', dict()).items():
            heading = 'Fail-test "%s"' % test_name
            report.append('\n%s\n%s\n' % (heading, '-' * len(heading)))
            report.append('### Test-code:')
            report.append(indent(test_code))
271
272
273
274
            messages = tests.get('__msg__', {}).get(test_name, "")
            if messages:
                report.append('\n### Messages:')
                report.append(messages)
di68kap's avatar
di68kap committed
275
276
277
278
            error = tests.get('__err__', {}).get(test_name, "")
            if error:
                report.append('\n### Error:')
                report.append(error)
279
280
281
    return '\n'.join(report)


282
def grammar_unit(test_unit, parser_factory, transformer_factory, report=True, verbose=False):
283
284
    """
    Unit tests for a grammar-parser and ast transformations.
285
    """
286
287
288
289
290
291
292
293
294
    output = []

    def write(s):
        nonlocal output
        """Append string `s` to output. The purpose is to defer printing to
        stdout in order to avoid muddled output when several unit tests run
        at the same time."""
        output.append(s)

di68kap's avatar
di68kap committed
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
    def clean_key(k):
        try:
            return k.replace('*', '')
        except AttributeError:
            return k

    def get(tests, category, key):
        try:
            value = tests[category][key] if key in tests[category] \
                else tests[category][clean_key(key)]
        except KeyError:
            raise AssertionError('%s-test %s for parser %s missing !?'
                                 % (category, test_name, parser_name))
        return value

310
    if isinstance(test_unit, str):
311
        _, unit_name = os.path.split(os.path.splitext(test_unit)[0])
312
        test_unit = unit_from_file(test_unit)
313
    else:
314
        unit_name = 'unit_test_' + str(id(test_unit))
315
    if verbose:
316
        write("\nGRAMMAR TEST UNIT: " + unit_name)
317
318
319
    errata = []
    parser = parser_factory()
    transform = transformer_factory()
320

321
    is_lookahead = set()    # type: Set[str]  # Dictionary of parser names
322
    with_lookahead = set()  # type: Set[Optional[Parser]]
323
    lookahead_flag = False  # type: bool
324
325
326
327

    def find_lookahead(p: Parser):
        """Raises a StopIterationError if parser `p` is or contains
        a Lookahead-parser."""
328
329
330
331
332
333
334
335
        nonlocal is_lookahead, with_lookahead, lookahead_flag
        if p in with_lookahead:
            lookahead_flag = True
        else:
            if isinstance(p, Lookahead):
                is_lookahead.add(p.tag_name)
                with_lookahead.add(p)
                lookahead_flag = True
336
337
338
339
            else:
                if any(child for child in (getattr(p, 'parsers', [])
                       or [getattr(p, 'parser', None)]) if child in with_lookahead):
                    with_lookahead.add(p)
340
341
342

    def has_lookahead(parser_name: str):
        """Returns `True`, if given parser is or contains a Lookahead-parser."""
343
        nonlocal is_lookahead, with_lookahead, lookahead_flag, parser
344
345
346
        p = parser[parser_name]
        if p in with_lookahead:
            return True
347
348
349
        lookahead_flag = False
        p.apply(find_lookahead)
        if lookahead_flag:
350
            with_lookahead.add(p)
351
352
353
            return True
        return False

354
    def lookahead_artifact(parser, raw_errors):
di68kap's avatar
di68kap committed
355
        """
356
        Returns True, if the error merely occurred, because the parser
eckhart's avatar
eckhart committed
357
        stopped in front of a sequence that was captured by a lookahead
358
359
360
        operator or if a mandatory lookahead failed at the end of data.
        This is required for testing of parsers that put a lookahead
        operator at the end. See test_testing.TestLookahead.
di68kap's avatar
di68kap committed
361
        """
362
        nonlocal is_lookahead
363
364
365
366
367
368
        return ((len(raw_errors) == 2  # case 1:  superfluous data for lookahead
                 and raw_errors[-1].code == Error.PARSER_LOOKAHEAD_MATCH_ONLY
                 and raw_errors[-2].code == Error.PARSER_STOPPED_BEFORE_END)
                #  case 2:  mandatory lookahead failure at end of text
                or (len(raw_errors) == 1
                    and raw_errors[-1].code == Error.MANDATORY_CONTINUATION_AT_EOF)
369
                    and any(tn in is_lookahead for tn in parser.history__[-1].call_stack))
di68kap's avatar
di68kap committed
370

371
    for parser_name, tests in test_unit.items():
372
        assert parser_name, "Missing parser name in test %s!" % unit_name
eckhart's avatar
eckhart committed
373
        assert not any(test_type in RESULT_STAGES for test_type in tests), \
374
375
376
377
378
            ("Test %s in %s already has results. Use reset_unit() before running again!"
             % (parser_name, unit_name))
        assert set(tests.keys()).issubset(UNIT_STAGES), \
            'Unknown test-types: %s ! Must be one of %s' \
            % (set(tests.keys()) - UNIT_STAGES, UNIT_STAGES)
379
        if verbose:
380
            write('  Match-Tests for parser "' + parser_name + '"')
381
        match_tests = set(tests['match'].keys()) if 'match' in tests else set()
382
383
        if 'ast' in tests:
            ast_tests = set(tests['ast'].keys())
di68kap's avatar
di68kap committed
384
385
386
            if not {clean_key(k) for k in ast_tests} <= {clean_key(k) for k in match_tests}:
                raise AssertionError('AST-Tests %s for parser %s lack corresponding match-tests!'
                                     % (str(ast_tests - match_tests), parser_name))
387
388
        if 'cst' in tests:
            cst_tests = set(tests['cst'].keys())
di68kap's avatar
di68kap committed
389
            if not {clean_key(k) for k in cst_tests} <= {clean_key(k) for k in match_tests}:
390
391
                raise AssertionError('CST-Tests %s lack corresponding match-tests!'
                                     % str(cst_tests - match_tests))
392
393
394

        # run match tests

395
        for test_name, test_code in tests.get('match', dict()).items():
Eckhart Arnold's avatar
Eckhart Arnold committed
396
            errflag = 0
397
398
399
            if verbose:
                infostr = '    match-test "' + test_name + '" ... '
                errflag = len(errata)
400
            try:
401
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
402
            except UnknownParserError as upe:
403
                cst = RootNode()
404
                cst = cst.new_error(Node(ZOMBIE_TAG, "").init_pos(0), str(upe))
eckhart's avatar
eckhart committed
405
            clean_test_name = str(test_name).replace('*', '')
eckhart's avatar
eckhart committed
406
            # log_ST(cst, "match_%s_%s.cst" % (parser_name, clean_test_name))
407
            tests.setdefault('__cst__', {})[test_name] = cst
408
            if "ast" in tests or report:
409
410
411
                ast = copy.deepcopy(cst)
                transform(ast)
                tests.setdefault('__ast__', {})[test_name] = ast
eckhart's avatar
eckhart committed
412
                # log_ST(ast, "match_%s_%s.ast" % (parser_name, clean_test_name))
di68kap's avatar
di68kap committed
413
            raw_errors = cst.collect_errors()
414
            if is_error(cst.error_flag) and not lookahead_artifact(parser, raw_errors):
di68kap's avatar
di68kap committed
415
                errors = adjust_error_locations(raw_errors, test_code)
Eckhart Arnold's avatar
Eckhart Arnold committed
416
                errata.append('Match test "%s" for parser "%s" failed:\n\tExpr.:  %s\n\n\t%s\n\n' %
417
                              (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
418
                               '\n\t'.join(str(m).replace('\n', '\n\t\t') for m in errors)))
di68kap's avatar
di68kap committed
419
                # tests.setdefault('__err__', {})[test_name] = errata[-1]
420
                # write parsing-history log only in case of failure!
421
                if is_logging():
di68kap's avatar
di68kap committed
422
                    log_parsing_history(parser, "match_%s_%s.log" % (parser_name, clean_test_name))
di68kap's avatar
di68kap committed
423
            elif "cst" in tests and parse_sxpr(get(tests, "cst", test_name)) != cst:
eckhart's avatar
eckhart committed
424
425
                errata.append('Concrete syntax tree test "%s" for parser "%s" failed:\n%s' %
                              (test_name, parser_name, cst.as_sxpr()))
426
            elif "ast" in tests:
di68kap's avatar
di68kap committed
427
                compare = parse_sxpr(get(tests, "ast", test_name))
428
429
430
431
                if compare != ast:
                    errata.append('Abstract syntax tree test "%s" for parser "%s" failed:'
                                  '\n\tExpr.:     %s\n\tExpected:  %s\n\tReceived:  %s'
                                  % (test_name, parser_name, '\n\t'.join(test_code.split('\n')),
Eckhart Arnold's avatar
Eckhart Arnold committed
432
433
                                     flatten_sxpr(compare.as_sxpr()),
                                     flatten_sxpr(ast.as_sxpr())))
di68kap's avatar
di68kap committed
434
435
            if errata:
                tests.setdefault('__err__', {})[test_name] = errata[-1]
436
            if verbose:
437
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
438

439
        if verbose and 'fail' in tests:
440
            write('  Fail-Tests for parser "' + parser_name + '"')
441
442
443

        # run fail tests

444
        for test_name, test_code in tests.get('fail', dict()).items():
Eckhart Arnold's avatar
Eckhart Arnold committed
445
            errflag = 0
446
447
448
            if verbose:
                infostr = '    fail-test  "' + test_name + '" ... '
                errflag = len(errata)
449
450
            # cst = parser(test_code, parser_name)
            try:
451
                cst = parser(test_code, parser_name, track_history=has_lookahead(parser_name))
452
            except UnknownParserError as upe:
453
                node = Node(ZOMBIE_TAG, "").init_pos(0)
eckhart's avatar
eckhart committed
454
                cst = RootNode(node).new_error(node, str(upe))
455
                errata.append('Unknown parser "{}" in fail test "{}"!'.format(parser_name, test_name))
456
                tests.setdefault('__err__', {})[test_name] = errata[-1]
457
            if not is_error(cst.error_flag) and not lookahead_artifact(parser, cst.collect_errors()):
458
459
                errata.append('Fail test "%s" for parser "%s" yields match instead of '
                              'expected failure!' % (test_name, parser_name))
460
                tests.setdefault('__err__', {})[test_name] = errata[-1]
461
                # write parsing-history log only in case of test-failure
462
                if is_logging():
463
                    log_parsing_history(parser, "fail_%s_%s.log" % (parser_name, test_name))
464
465
466
            if cst.error_flag:
                tests.setdefault('__msg__', {})[test_name] = \
                    "\n".join(str(e) for e in cst.collect_errors())
467
            if verbose:
468
                write(infostr + ("OK" if len(errata) == errflag else "FAIL"))
469

470
471
    # write test-report
    if report:
472
        report_dir = "REPORT"
473
474
        if not os.path.exists(report_dir):
            os.mkdir(report_dir)
di68kap's avatar
di68kap committed
475
        with open(os.path.join(report_dir, unit_name + '.md'), 'w', encoding='utf8') as f:
476
            f.write(get_report(test_unit))
477

478
    print('\n'.join(output))
479
480
481
    return errata


482
def reset_unit(test_unit):
eckhart's avatar
eckhart committed
483
484
485
486
    """
    Resets the tests in ``test_unit`` by removing all results and error
    messages.
    """
487
488
489
490
491
492
493
494
    for parser, tests in test_unit.items():
        for key in list(tests.keys()):
            if key not in UNIT_STAGES:
                if key not in RESULT_STAGES:
                    print('Removing unknown component %s from test %s' % (key, parser))
                del tests[key]


Eckhart Arnold's avatar
Eckhart Arnold committed
495
496
497
498
def grammar_suite(directory, parser_factory, transformer_factory,
                  fn_patterns=['*test*'],
                  ignore_unknown_filetypes=False,
                  report=True, verbose=True):
499
500
    """
    Runs all grammar unit tests in a directory. A file is considered a test
501
502
    unit, if it has the word "test" in its name.
    """
503
    if not isinstance(fn_patterns, collections.abc.Iterable):
Eckhart Arnold's avatar
Eckhart Arnold committed
504
        fn_patterns = [fn_patterns]
505
    all_errors = collections.OrderedDict()
506
507
    if verbose:
        print("\nScanning test-directory: " + directory)
508
509
    save_cwd = os.getcwd()
    os.chdir(directory)
eckhart's avatar
eckhart committed
510
511
    if is_logging():
        clear_logs()
512
513
514
515
516
517
518
    with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
        errata_futures = []
        for filename in sorted(os.listdir()):
            if any(fnmatch.fnmatch(filename, pattern) for pattern in fn_patterns):
                parameters = filename, parser_factory, transformer_factory, report, verbose
                errata_futures.append((filename, pool.submit(grammar_unit, *parameters)))
        for filename, err_future in errata_futures:
519
            try:
520
                errata = err_future.result()
521
522
523
                if errata:
                    all_errors[filename] = errata
            except ValueError as e:
524
                if not ignore_unknown_filetypes or str(e).find("Unknown") < 0:
525
                    raise e
526
    os.chdir(save_cwd)
eckhart's avatar
eckhart committed
527
528
    error_report = []
    err_N = 0
529
530
    if all_errors:
        for filename in all_errors:
di68kap's avatar
di68kap committed
531
            error_report.append('Errors found by unit test "%s":\n' % filename)
di68kap's avatar
di68kap committed
532
            err_N += len(all_errors[filename])
533
534
535
            for error in all_errors[filename]:
                error_report.append('\t' + '\n\t'.join(error.split('\n')))
    if error_report:
di68kap's avatar
di68kap committed
536
537
538
539
        # if verbose:
        #     print("\nFAILURE! %i error%s found!\n" % (err_N, 's' if err_N > 1 else ''))
        return ('Test suite "%s" revealed %s error%s:\n\n'
                % (directory, err_N, 's' if err_N > 1 else '') + '\n'.join(error_report))
eckhart's avatar
eckhart committed
540
541
    if verbose:
        print("\nSUCCESS! All tests passed :-)\n")
542
543
544
    return ''


545
546
547
548
549
550
#######################################################################
#
#  general unit testing support
#
#######################################################################

551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580

def run_tests_in_class(test, namespace):
    """
    Runs all tests in test-class `test` in the given namespace.
    """
    def instantiate(cls_name, namespace):
        exec("obj = " + cls_name + "()", namespace)
        obj = namespace["obj"]
        if "setup" in dir(obj):
            obj.setup()
        return obj

    obj = None
    try:
        if test.find('.') >= 0:
            cls_name, method_name = test.split('.')
            obj = instantiate(cls_name, namespace)
            print("Running " + cls_name + "." + method_name)
            exec('obj.' + method_name + '()')
        else:
            obj = instantiate(test, namespace)
            for name in dir(obj):
                if name.lower().startswith("test"):
                    print("Running " + test + "." + name)
                    exec('obj.' + name + '()')
    finally:
        if "teardown" in dir(obj):
            obj.teardown()


581
def run_test_function(func_name, namespace):
582
583
584
    """
    Run the test-function `test` in the given namespace.
    """
585
586
    print("Running test-function: " + func_name)
    exec(func_name + '()', namespace)
587
588


eckhart's avatar
eckhart committed
589
def runner(tests, namespace):
590
591
    """
    Runs all or some selected Python unit tests found in the
eckhart's avatar
eckhart committed
592
    namespace. To run all tests in a module, call
593
    ``runner("", globals())`` from within that module.
594

595
596
597
598
    Unit-Tests are either classes, the name of which starts with
    "Test" and methods, the name of which starts with "test" contained
    in such classes or functions, the name of which starts with "test".

599
    Args:
eckhart's avatar
eckhart committed
600
601
602
603
604
605
        tests: String or list of strings with the names of tests to
            run. If empty, runner searches by itself all objects the
            of which starts with 'test' and runs it (if its a function)
            or all of its methods that start with "test" if its a class
            plus the "setup" and "teardown" methods if they exist.

eckhart's avatar
eckhart committed
606
        namespace: The namespace for running the test, usually
607
            ``globals()`` should be used.
eckhart's avatar
eckhart committed
608

609
610
611
612
613
614
615
616
    Example:
        class TestSomething()
            def setup(self):
                pass
            def teardown(self):
                pass
            def test_something(self):
                pass
eckhart's avatar
eckhart committed
617

618
        if __name__ == "__main__":
di68kap's avatar
di68kap committed
619
            from DHParser.testing import runner
eckhart's avatar
eckhart committed
620
            runner("", globals())
621
    """
eckhart's avatar
eckhart committed
622
623
    test_classes = []
    test_functions = []
624

eckhart's avatar
eckhart committed
625
626
627
628
    if tests:
        if isinstance(tests, str):
            tests = tests.split(' ')
        assert all(test.lower().startswith('test') for test in tests)
629
    else:
eckhart's avatar
eckhart committed
630
631
632
633
634
635
636
637
        tests = namespace.keys()

    for name in tests:
        if name.lower().startswith('test'):
            if inspect.isclass(namespace[name]):
                test_classes.append(name)
            elif inspect.isfunction(namespace[name]):
                test_functions.append(name)
638
639

    for test in test_classes:
640
        run_tests_in_class(test, namespace)
641
642

    for test in test_functions:
643
644
645
        run_test_function(test, namespace)


646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
def run_file(fname):
    if fname.lower().startswith('test_') and fname.endswith('.py'):
        # print('\nRUNNING UNIT TESTS IN: ' + fname)
        exec('import ' + fname[:-3])
        runner('', eval(fname[:-3]).__dict__)


def run_path(path):
    """Runs all unit tests in `path`"""
    if os.path.isdir(path):
        sys.path.append(path)
        files = os.listdir(path)
        result_futures = []
        with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count()) as pool:
            for f in files:
                result_futures.append(pool.submit(run_file, f))
                # run_file(f)  # for testing!
            for r in result_futures:
                try:
                    _ = r.result()
                except AssertionError as failure:
                    print(failure)
    else:
        path, fname = os.path.split(path)
        sys.path.append(path)
        run_file(fname)
    sys.path.pop()