Commit 7f66e761 authored by eckhart's avatar eckhart
Browse files

- In-Series-parser-recovery added and tested (not really powerful, unfortunately!)

parent 5a5efa7b
......@@ -41,7 +41,7 @@ from DHParser.transform import TransformationFunc, traverse, remove_brackets, \
reduce_single_child, replace_by_single_child, remove_expendables, \
remove_tokens, flatten, forbid, assert_content
from DHParser.versionnumber import __version__
from typing import Callable, Dict, List, Set, Tuple, Union, Optional, Any
from typing import Callable, Dict, List, Set, Tuple, Sequence, Union, Optional, Any
__all__ = ('get_ebnf_preprocessor',
......@@ -345,13 +345,21 @@ class EBNFDirectives:
always matches, so in case of multiple error messages,
this condition should be placed at the end.
resume: mapping of symbols to a list of search conditions. A
search condition can be either a string ot a regular
expression. The closest match from all search conditions
is the point of reentry for the parser after a parser
has error occurred.
skip: mapping of symbols to a list of search expressions. A
search expressions can be either a string ot a regular
expression. The closest match is the point of reentry
for the series-parser when a mandatory item failed to
match the following text.
resume: mapping of symbols to a list of search expressions. A
search expressions can be either a string ot a regular
expression. The closest match is the point of reentry
for after a parsing error has error occurred. Other
than the skip field, this configures resuming after
the failing parser has returned.
"""
__slots__ = ['whitespace', 'comment', 'literalws', 'tokens', 'filter', 'error', 'resume']
__slots__ = ['whitespace', 'comment', 'literalws', 'tokens', 'filter', 'error', 'skip',
'resume']
def __init__(self):
self.whitespace = WHITESPACE_TYPES['vertical'] # type: str
......@@ -360,6 +368,7 @@ class EBNFDirectives:
self.tokens = set() # type: Collection[str]
self.filter = dict() # type: Dict[str, str]
self.error = dict() # type: Dict[str, List[Tuple[ReprType, ReprType]]]
self.skip = dict() # type: Dict[str, List[Union[unrepr, str]]]
self.resume = dict() # type: Dict[str, List[Union[unrepr, str]]]
def __getitem__(self, key):
......@@ -445,7 +454,10 @@ class EBNFCompiler(Compiler):
allows to add a compiler error in those cases where (i) an
error message has been defined but will never used or (ii)
an error message is accidently used twice. For examples, see
`test_ebnf.TestErrorCustomization`
`test_ebnf.TestErrorCustomization`.
consumed_skip_rules: The same as `consumed_custom_errors` only for
in-series-resume-rules (aka 'skip-rules') for Series-parsers.
re_flags: A set of regular expression flags to be added to all
regular expressions found in the current parsing process
......@@ -463,6 +475,7 @@ class EBNFCompiler(Compiler):
RAW_WS_KEYWORD = "WHITESPACE__"
WHITESPACE_PARSER_KEYWORD = "wsp__"
RESUME_RULES_KEYWORD = "resume_rules__"
SKIP_RULES_SUFFIX = '_skip__'
ERR_MSG_SUFFIX = '_err_msg__'
RESERVED_SYMBOLS = {WHITESPACE_KEYWORD, RAW_WS_KEYWORD, COMMENT_KEYWORD,
RESUME_RULES_KEYWORD, ERR_MSG_SUFFIX}
......@@ -496,6 +509,7 @@ class EBNFCompiler(Compiler):
self.directives = EBNFDirectives() # type: EBNFDirectives
self.defined_directives = set() # type: Set[str]
self.consumed_custom_errors = set() # type: Set[str]
self.consumed_skip_rules = set() # type: Set[str]
self.grammar_id += 1
......@@ -653,6 +667,13 @@ class EBNFCompiler(Compiler):
return s.strip('"') if s[0] == '"' else s.strip("'")
return ''
def _gen_search_list(self, nodes: Sequence[Node]) -> List[Union[unrepr, str]]:
search_list = [] # type: List[Union[unrepr, str]]
for child in nodes:
rule = self._gen_search_rule(child)
search_list.append(rule if rule else unrepr(child.content.strip()))
return search_list
def assemble_parser(self, definitions: List[Tuple[str, str]], root_node: Node) -> str:
"""
......@@ -726,7 +747,29 @@ class EBNFCompiler(Compiler):
self.tree.new_error(
def_node, 'Customized error message for symbol "{}" will never be used, '
'because the mandatory marker "§" appears nowhere in its definiendum!'
.format(symbol), Error.UNUSED_ERROR_MSG_WARNING)
.format(symbol), Error.UNUSED_ERROR_HANDLING_WARNING)
# prepare and add skip-rules
for symbol, skip in self.directives.skip.items():
skip_rules = [] # type: List[Tuple[ReprType, ReprType]]
for search in skip:
if isinstance(search, unrepr) and search.s.isidentifier():
try:
nd = self.rules[search.s][0].children[1]
search = self._gen_search_rule(nd)
except IndexError:
search = ''
skip_rules.append(search)
definitions.append((symbol + self.SKIP_RULES_SUFFIX, repr(skip_rules)))
for symbol in self.directives.error.keys():
if symbol not in self.consumed_skip_rules:
def_node = self.rules[symbol][0]
self.tree.new_error(
def_node, '"Skip-rules" for symbol "{}" will never be used, '
'because the mandatory marker "§" appears nowhere in its definiendum!'
.format(symbol), Error.UNUSED_ERROR_HANDLING_WARNING)
# prepare parser class header and docstring and
# add EBNF grammar to the doc string of the parser class
......@@ -941,17 +984,23 @@ class EBNFCompiler(Compiler):
self.tree.new_error(node, 'Directive "%s" allows at most two parameters' % key)
self.directives.error[symbol] = error_msgs
elif key.endswith('_skip'):
symbol = key[:-5]
if symbol in self.directives.skip:
self.tree.new_error(node, 'In-series resuming for "%s" has already been defined'
' earlier!' % symbol)
if symbol in self.rules:
self.tree.new_error(node, 'Skip list for resuming in series for symbol "{}"'
'must be defined before the symbol!'.format(symbol))
self.directives.skip[symbol] = self._gen_search_list(node.children[1:])
elif key.endswith('_resume'):
symbol = key[:-7]
if symbol in self.directives.resume:
self.tree.new_error(node, 'Reentry conditions for "%s" have already been defined'
' earlier!' % symbol)
else:
reentry_conditions = [] # type: List[Union[unrepr, str]]
for child in node.children[1:]:
rule = self._gen_search_rule(child)
reentry_conditions.append(rule if rule else unrepr(child.content.strip()))
self.directives.resume[symbol] = reentry_conditions
self.directives.resume[symbol] = self._gen_search_list(node.children[1:])
else:
self.tree.new_error(node, 'Unknown directive %s ! (Known ones are %s .)' %
......@@ -992,9 +1041,6 @@ class EBNFCompiler(Compiler):
for nd in node.children:
if nd.parser.ptype == TOKEN_PTYPE and nd.content == "§":
mandatory_marker.append(len(filtered_children))
# if len(filtered_children) == 0:
# self.tree.new_error(nd.pos, 'First item of a series should not be mandatory.',
# Error.WARNING)
if len(mandatory_marker) > 1:
self.tree.new_error(nd, 'One mandatory marker (§) sufficient to declare '
'the rest of the series as mandatory.', Error.WARNING)
......@@ -1002,23 +1048,34 @@ class EBNFCompiler(Compiler):
filtered_children.append(nd)
saved_result = node.result
node.result = tuple(filtered_children)
if len(filtered_children) == 1:
compiled = self.non_terminal(node, 'Required')
else:
custom_args = ['mandatory=%i' % mandatory_marker[0]] if mandatory_marker else []
# add custom error message if it has been declared for the currend definition
# add custom error message if it has been declared for the current definition
if custom_args:
current_symbol = next(reversed(self.rules.keys()))
# add customized error messages, if defined
if current_symbol in self.directives.error:
if current_symbol in self.consumed_custom_errors:
self.tree.new_error(
node, "Cannot apply customized error messages unambigiously, because "
"symbol {} contains more than one series with a mandatory marker '§' "
"in its definiens.".format(current_symbol), Error.AMBIGUOUS_ERROR_MSG)
"in its definiens.".format(current_symbol),
Error.AMBIGUOUS_ERROR_HANDLING)
else:
# use class field instead or direct representation of error messages!
custom_args.append('err_msgs=' + current_symbol + self.ERR_MSG_SUFFIX)
self.consumed_custom_errors.add(current_symbol)
# add skip-rules to resume parsing of a series, if rules have been declared
if current_symbol in self.directives.skip:
if current_symbol in self.consumed_skip_rules:
self.tree.new_error(
node, "Cannot apply 'skip-rules' unambigiously, because symbol "
"{} contains more than one series with a mandatory marker '§' "
"in its definiens.".format(current_symbol),
Error.AMBIGUOUS_ERROR_HANDLING)
else:
# use class field instead or direct representation of error messages!
custom_args.append('skip=' + current_symbol + self.SKIP_RULES_SUFFIX)
self.consumed_skip_rules.add(current_symbol)
compiled = self.non_terminal(node, 'Series', custom_args)
node.result = saved_result
return compiled
......
......@@ -73,7 +73,7 @@ class Error:
REDEFINED_DIRECTIVE_WARNING = ErrorCode(110)
REDECLARED_TOKEN_WARNING = ErrorCode(120)
UNUSED_ERROR_MSG_WARNING = ErrorCode(130)
UNUSED_ERROR_HANDLING_WARNING = ErrorCode(130)
UNDEFINED_SYMBOL_IN_TRANSTABLE_WARNING = ErrorCode(610)
......@@ -86,7 +86,7 @@ class Error:
PARSER_STOPPED_BEFORE_END = ErrorCode(1040)
CAPTURE_STACK_NOT_EMPTY = ErrorCode(1050)
MALFORMED_ERROR_STRING = ErrorCode(1060)
AMBIGUOUS_ERROR_MSG = ErrorCode(1070)
AMBIGUOUS_ERROR_HANDLING = ErrorCode(1070)
def __init__(self, message: str, pos, code: ErrorCode = ERROR,
orig_pos: int = -1, line: int = -1, column: int = -1) -> None:
......
......@@ -1377,24 +1377,31 @@ class Series(NaryOperator):
RX_ARGUMENT = re.compile(r'\s(\S)')
NOPE = 1000
MessageType = List[Tuple[Union[str, Any], str]]
MessagesType = List[Tuple[Union[str, Any], str]]
def __init__(self, *parsers: Parser, mandatory: int = NOPE, err_msgs: MessageType=[]) -> None:
def __init__(self, *parsers: Parser,
mandatory: int = NOPE,
err_msgs: MessagesType=[],
skip: ResumeList = []) -> None:
super().__init__(*parsers)
assert not (mandatory == Series.NOPE and err_msgs), \
'Custom error messages only make sense if parameter "mandatory" is set!'
'Custom error messages require that parameter "mandatory" is set!'
assert not (mandatory == Series.NOPE and skip), \
'Search expressions for skipping text require that parameter "mandatory" is set!'
length = len(self.parsers)
assert 1 <= length < Series.NOPE, \
'Length %i of series exceeds maximum length of %i' % (length, Series.NOPE)
if mandatory < 0:
mandatory += length
assert 0 <= mandatory < length or mandatory == Series.NOPE
self.mandatory = mandatory
self.err_msgs = err_msgs
self.mandatory = mandatory # type: int
self.err_msgs = err_msgs # type: Series.MessagesType
self.skip = skip # type: ResumeList
def __deepcopy__(self, memo):
parsers = copy.deepcopy(self.parsers, memo)
duplicate = self.__class__(*parsers, mandatory=self.mandatory, err_msgs=self.err_msgs)
duplicate = self.__class__(*parsers, mandatory=self.mandatory,
err_msgs=self.err_msgs, skip=self.skip)
duplicate.name = self.name
duplicate.ptype = self.ptype
return duplicate
......@@ -1409,7 +1416,8 @@ class Series(NaryOperator):
if pos < self.mandatory:
return None, text
else:
i = 0
k = reentry_point(text_, self.skip) if self.skip else -1
i = k if k >= 0 else 0
location = self.grammar.document_length__ - len(text_)
node = Node(None, text_[:i]).init_pos(location)
found = text_[:10].replace('\n', '\\n ')
......@@ -1431,17 +1439,21 @@ class Series(NaryOperator):
else Error.MANDATORY_CONTINUATION_AT_EOF)
self.grammar.tree__.add_error(node, mandatory_violation)
text_ = text_[i:]
# check if parsing of the series can be resumed somewhere
if k >= 0:
nd, text_ = parser(text_) # try current parser again
if nd:
results += (node,)
node = nd
else:
results += (node,)
# TODO: Add queue-jumping here (XXX_skip = Regex, Regex, Regex...)
break
results += (node,)
# if node.error_flag: # break on first error
# break
assert len(results) <= len(self.parsers)
assert len(results) <= len(self.parsers) \
or len(self.parsers) >= len([p for p in results if p.parser != ZOMBIE_PARSER])
node = Node(self, results)
if mandatory_violation:
raise ParserError(node, text, first_throw=True)
# self.grammar.tree__.add_error(node, mandatory_violation)
return node, text_
def __repr__(self):
......
......@@ -179,6 +179,16 @@ class TestParserNameOverwriteBug:
messages = st.collect_errors()
assert not has_errors(messages), str(messages)
def test_single_mandatory_bug(self):
lang = """series = § /B/"""
result, messages, ast = compile_ebnf(lang)
# print(result)
assert result.find('Required') < 0
parser = grammar_provider(lang)()
st = parser('B')
assert not st.error_flag
class TestSemanticValidation:
def check(self, minilang, bool_filter=lambda x: x):
......@@ -496,7 +506,7 @@ class TestErrorCustomization:
assert False, "CompilationError because of ambiguous error message exptected!"
except CompilationError as compilation_error:
err = next(compilation_error.errors)
assert err.code == Error.AMBIGUOUS_ERROR_MSG, str(compilation_error)
assert err.code == Error.AMBIGUOUS_ERROR_HANDLING, str(compilation_error)
def test_unsed_error_customization(self):
lang = """
......@@ -506,7 +516,7 @@ class TestErrorCustomization:
other = "X" | "Y" | "Z"
"""
result, messages, ast = compile_ebnf(lang)
assert messages[0].code == Error.UNUSED_ERROR_MSG_WARNING
assert messages[0].code == Error.UNUSED_ERROR_HANDLING_WARNING
class TestCustomizedResumeParsing:
......@@ -562,6 +572,48 @@ class TestCustomizedResumeParsing:
assert len(cst.collect_errors()) == 1
class TestInSeriesResume:
def setup(self):
lang = """
document = series
@series_skip = /B/, /C/, /D/, /E/, /F/, /G/
series = "A" §"B" "C" "D" "E" "F" "G"
"""
try:
result, _, _ = compile_ebnf(lang)
self.gr = grammar_provider(lang)()
except CompilationError as ce:
print(ce)
def test_garbage_in_series(self):
st = self.gr('ABCDEFG')
assert not st.error_flag
st = self.gr('AB XYZ CDEFG')
errors = st.collect_errors()
assert len(errors) == 1 and errors[0].code == Error.MANDATORY_CONTINUATION
st = self.gr('AB XYZ CDE XYZ FG')
errors = st.collect_errors()
assert len(errors) == 2 and all(err.code == Error.MANDATORY_CONTINUATION for err in errors)
st = self.gr('AB XYZ CDE XNZ FG') # fails to resume parsing
errors = st.collect_errors()
assert len(errors) >= 1 and errors[0].code == Error.MANDATORY_CONTINUATION
def test_series_gap(self):
st = self.gr('ABDEFG')
errors = st.collect_errors()
assert len(errors) == 1 and errors[0].code == Error.MANDATORY_CONTINUATION
st = self.gr('ABXEFG') # two missing, one wrong element added
errors = st.collect_errors()
assert len(errors) == 2 and all(err.code == Error.MANDATORY_CONTINUATION for err in errors)
st = self.gr('AB_DE_G')
errors = st.collect_errors()
assert len(errors) == 2 and all(err.code == Error.MANDATORY_CONTINUATION for err in errors)
def test_series_permutation(self):
st = self.gr('ABEDFG')
errors = st.collect_errors()
assert len(errors) >= 1 # cannot really recover from permutation errors
if __name__ == "__main__":
from DHParser.testing import runner
......
......@@ -639,7 +639,6 @@ class TestReentryAfterError:
self.gr = grammar_provider(lang)()
def test_no_resume_rules(self):
# 1. no resume rules
gr = self.gr; gr.resume_rules = dict()
content = 'ALPHA acb BETA bac GAMMA cab .'
cst = gr(content)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment