Commit e1da7f7f authored by eckhart's avatar eckhart

refactoring of history recording

parent d622d9c7
......@@ -281,6 +281,7 @@ class HistoryRecord:
'td.line, td.column {color:grey}\n'
'.text{color:blue}\n'
'.failtext {font-weight:normal; color:grey}\n'
'.errortext {font-weight:normal; color:darkred}\n'
'.unmatched {font-weight:normal; color:lightgrey}\n'
'.fail {font-weight:bold; color:darkgrey}\n'
'.error {font-weight:bold; color:red}\n'
......@@ -361,6 +362,7 @@ class HistoryRecord:
classes[idx['text']] = 'failtext'
else: # ERROR
stack += '<br/>\n"%s"' % self.err_msg()
classes[idx['text']] = 'errortext'
tpl = self.Snapshot(str(self.line_col[0]), str(self.line_col[1]),
stack, status, excerpt) # type: Tuple[str, str, str, str, str]
return ''.join(['<tr>'] + [('<td class="%s">%s</td>' % (cls, item))
......@@ -387,7 +389,7 @@ class HistoryRecord:
def status(self) -> str:
if self.errors:
return self.ERROR + ": " + ', '.join(str(e.code) for e in self.errors)
elif self.node is None or self.node.tag_name in (ZOMBIE_TAG, NONE_TAG):
elif self.node.tag_name in (NONE_TAG, ZOMBIE_TAG):
return self.FAIL
elif self.node.tag_name == EMPTY_PTYPE:
return self.DROP
......@@ -398,7 +400,7 @@ class HistoryRecord:
@property
def excerpt(self):
if self.node:
if self.node.tag_name not in (NONE_TAG, ZOMBIE_TAG) and not self.errors:
excerpt = abbreviate_middle(str(self.node), 40)
else:
s = self.text
......@@ -413,7 +415,7 @@ class HistoryRecord:
@property
def remaining(self) -> int:
return len(self.text) - (len(self.node) if self.node else 0)
return len(self.text) - len(self.node)
@staticmethod
def last_match(history: List['HistoryRecord']) -> Union['HistoryRecord', None]:
......@@ -497,7 +499,7 @@ def log_parsing_history(grammar, log_file_name: str = '', html: bool = True) ->
otherwise as plain test. (Browsers might take a few seconds or
minutes to display the table for long histories.)
"""
def write_log(history, log_name):
def write_log(history: List[str], log_name: str) -> None:
htm = '.html' if html else ''
path = os.path.join(log_dir() or '', log_name + "_parser.log" + htm)
if os.path.exists(path):
......@@ -512,7 +514,7 @@ def log_parsing_history(grammar, log_file_name: str = '', html: bool = True) ->
else:
f.write("\n".join(history))
def append_line(log, line):
def append_line(log: List[str], line: str) -> None:
"""Appends a line to a list of HTML table rows. Starts a new
table every 100 rows to allow browser to speed up rendering.
Does this really work...?"""
......
This diff is collapsed.
......@@ -1393,10 +1393,10 @@ class RootNode(Node):
"""
Adds an Error object to the tree, locating it at a specific node.
"""
assert isinstance(node, Node)
if not node:
node = Node(ZOMBIE_TAG, '').with_pos(error.pos)
else:
assert isinstance(node, Node)
assert isinstance(node, FrozenNode) or node.pos <= error.pos, \
"%i <= %i <= %i ?" % (node.pos, error.pos, node.pos + max(1, len(node) - 1))
# assert node.pos == error.pos or isinstance(node, FrozenNode)
......
......@@ -36,24 +36,12 @@ __all__ = ('trace_history', 'all_descendants', 'set_tracer',
'resume_notices_on')
def add_resume_notice(parser, rest: StringView, err_node: Node) -> None:
"""Adds a resume notice to the error node with information about
the reentry point and the parser."""
if parser == parser._grammar.start_parser__:
return
call_stack = parser._grammar.call_stack__
if len(call_stack) >= 2:
i, N = -2, -len(call_stack)
while i >= N and call_stack[i][0][0:1] in (':', '/', '"', "'", "`"):
i -= 1
if i >= N and i != -2:
parent_info = "{}->{}".format(call_stack[i][0], call_stack[-2][0])
else:
parent_info = call_stack[-2][0]
else:
parent_info = "?"
notice = Error('Resuming from parser {} with parser {} at point: {}'
.format(parser.pname or parser.ptype, parent_info, repr(rest[:10])),
notice = Error('Resuming from {} with parser {} at point: {}'
.format(err_node.tag_name, parser.tag_name, repr(rest[:10])),
parser._grammar.document_length__ - len(rest), Error.RESUME_NOTICE)
parser._grammar.tree__.add_error(err_node, notice)
......@@ -64,42 +52,66 @@ def trace_history(self: Parser, text: StringView) -> Tuple[Optional[Node], Strin
grammar.call_stack__.append(
((self.repr if self.tag_name in (REGEXP_PTYPE, TOKEN_PTYPE)
else (self.pname or self.tag_name)), location))
# TODO: Record history on turning points here? i.e. when moving_forward is False
grammar.moving_forward__ = True
if grammar.most_recent_error__:
save_error = grammar.most_recent_error__
grammar.most_recent_error__ = None
else:
save_error = None
try:
node, rest = self._parse(text)
except ParserError as pe:
grammar.call_stack__.pop()
if self == grammar.start_parser__:
if pe.first_throw:
grammar.most_recent_error__ = pe
lc = line_col(grammar.document_lbreaks__, pe.error.pos)
# TODO: get the call stack from when the error occured, here
nd = pe.node
grammar.history__.append(
HistoryRecord(grammar.call_stack__, pe.node, pe.rest, lc, [pe.error]))
HistoryRecord(grammar.call_stack__, nd, pe.rest[len(nd):], lc, [pe.error]))
# if self == grammar.start_parser__:
# lc = line_col(grammar.document_lbreaks__, pe.error.pos)
# # TODO: get the call stack from when the error occured, here
# nd = pe.node
# grammar.history__.append(
# HistoryRecord(grammar.call_stack__, nd, pe.rest[len(nd):], lc, [pe.error]))
raise pe
# Mind that memoized parser calls will not appear in the history record!
# Don't track returning parsers except in case an error has occurred!
# TODO: Try recording all named parsers on the way back?
delta = text._len - rest._len
parser_error = grammar.most_recent_error__
if ((grammar.moving_forward__ or parser_error or (node and not self.anonymous))
pe = grammar.most_recent_error__
if ((grammar.moving_forward__ or pe or (node and not self.anonymous))
and (self.tag_name != WHITESPACE_PTYPE)):
# TODO: Make dropping insignificant whitespace from history configurable
errors = [parser_error.error] if parser_error else [] # type: List[Error]
line_col = grammar.line_col__(text)
errors = [pe.error] if pe else [] # type: List[Error]
nd = Node(node.tag_name, text[:delta]).with_pos(location) if node else None
record = HistoryRecord(grammar.call_stack__, nd, rest, line_col, errors)
if (not grammar.history__ or line_col != grammar.history__[-1].line_col
or record.call_stack != grammar.history__[-1].call_stack[:len(record.call_stack)]):
lc = line_col(grammar.document_lbreaks__, location)
record = HistoryRecord(grammar.call_stack__, nd, pe.rest if pe else rest, lc, errors)
cs_len = len(record.call_stack)
if (not grammar.history__ or lc != grammar.history__[-1].line_col
or record.call_stack != grammar.history__[-1].call_stack[:cs_len]):
grammar.history__.append(record)
if parser_error:
if grammar.resume_notices__:
add_resume_notice(self, rest, parser_error.node)
grammar.most_recent_error__ = None
if pe:
grammar.most_recent_error__ = None
if grammar.resume_notices__:
# add_resume_notice(self, pe.rest[len(pe.node):], pe.node)
text_ = pe.rest[len(pe.node):]
target = text_
if len(target) >= 10:
target = target[:7] + '...'
notice = Error('Resuming from {} with parser {} at point: {}'
.format(pe.node.tag_name, self.tag_name, repr(target)),
self._grammar.document_length__ - len(text_), Error.RESUME_NOTICE)
self._grammar.tree__.add_error(pe.node, notice)
if save_error:
grammar.most_recent_error__ = save_error
grammar.moving_forward__ = False
grammar.call_stack__.pop()
return node, rest
......
......@@ -531,6 +531,7 @@ class TestErrorRecovery:
resume_notices_on(parser)
st = parser('AB_D')
assert len(st.errors) == 2 and any(err.code == Error.RESUME_NOTICE for err in st.errors)
assert 'Skipping' in str(st.errors_sorted[1])
def test_AllOf_skip(self):
......@@ -824,7 +825,7 @@ class TestReentryAfterError:
assert cst.content == content
# assert cst.pick('alpha').content.startswith('ALPHA')
def test_severl_reentry_points(self):
def test_several_reentry_points(self):
gr = self.gr; gr.resume_rules = dict()
gr.resume_rules__['alpha'] = [re.compile(r'(?=BETA)'), re.compile(r'(?=GAMMA)')]
content = 'ALPHA acb BETA bac GAMMA cab .'
......@@ -847,7 +848,6 @@ class TestReentryAfterError:
assert len(cst.errors_sorted) == 1
resume_notices_on(gr)
cst = gr(content)
# print(cst.errors)
assert len(cst.errors) == 2 and any(err.code == Error.RESUME_NOTICE for err in cst.errors)
def test_several_resume_rules_innermost_rule_matching(self):
......
......@@ -28,13 +28,16 @@ sys.path.append(os.path.abspath(os.path.join(scriptpath, '..')))
from DHParser import grammar_provider, all_descendants, \
set_tracer, trace_history, log_parsing_history, start_logging, log_dir, \
set_config_value, resume_notices_on
set_config_value, resume_notices_on, Error
def get_history(name) -> str:
history_fname = os.path.join(log_dir() or '', name + "_full_parser.log.html")
import webbrowser
# just for debugging:
import webbrowser, time
webbrowser.open(history_fname)
time.sleep(1)
# ------------------
with open(history_fname, 'r', encoding='utf-8') as f:
history_file = f.read()
return history_file
......@@ -62,9 +65,16 @@ class TestTrace:
set_tracer(all_desc, trace_history)
st = gr('2*(3+4)')
assert(str(st)) == '2*(3+4)'
history = gr.history__
for record in history:
if record.status.startswith(record.FAIL):
# check if the first failed parser yields an excerpt
assert record.excerpt
break
assert len(history) == 24
log_parsing_history(gr, 'trace_simple')
history = get_history('trace_simple')
assert history.count('<tr>') == 25
assert history.count('<tr>') == 25 # same as len(history) + 1 title row
def test_trace_stopped_early(self):
lang = """
......@@ -75,10 +85,10 @@ class TestTrace:
gr = grammar_provider(lang)()
all_desc = all_descendants(gr.root_parser__)
set_tracer(all_desc, trace_history)
st = gr('2*(3+4)...')
st = gr('2*(3+4)xxx')
# print(st.as_sxpr(compact=True))
log_parsing_history(gr, 'trace_simple')
history = get_history('trace_simple')
log_parsing_history(gr, 'trace_stopped_early')
history = get_history('trace_stopped_early')
assert history.count('<tr>') == 26
def test_trace_drop(self):
......@@ -125,7 +135,7 @@ class TestTrace:
content = 'ALPHA acb BETA bac GAMMA cab .'
cst = gr(content)
assert cst.error_flag
assert cst.content == content
assert cst.content == content, cst.as_sxpr()
assert cst.pick('alpha').content.startswith('ALPHA')
# because of resuming, there should be only one error message
assert len(cst.errors_sorted) == 1
......@@ -141,6 +151,18 @@ class TestTrace:
class TestErrorReporting:
def setup(self):
lang = """
document = alpha [beta] gamma "."
alpha = "ALPHA" abc
abc = §"a" "b" "c"
beta = "BETA" (bac | bca)
bac = "b" "a" §"c"
bca = "b" "c" §"a"
gamma = "GAMMA" §(cab | cba)
cab = "c" "a" §"b"
cba = "c" "b" §"a"
"""
self.gr = grammar_provider(lang)()
start_logging()
def teardown(self):
......@@ -150,6 +172,24 @@ class TestErrorReporting:
os.remove(os.path.join(LOG_DIR, fname))
os.rmdir(LOG_DIR)
def test_trace_noskip(self):
lang = """
document = series | /.*/
series = "A" "B" §"C" "D"
"""
gr = grammar_provider(lang)()
set_tracer(all_descendants(gr.root_parser__), trace_history)
_ = gr('AB_D')
for record in gr.history__:
if record.status.startswith(record.ERROR):
assert record.excerpt == '_D'
if record.errors[0].code == Error.PARSER_STOPPED_BEFORE_END:
break
else:
assert False, "Missing Error!"
# log_parsing_history(gr, 'trace_noskip')
# get_history('trace_noskip')
def test_trace_skip_clause(self):
lang = """
document = series | /.*/
......@@ -157,13 +197,33 @@ class TestErrorReporting:
series = "A" "B" §"C" "D"
"""
gr = grammar_provider(lang)()
set_tracer(all_descendants(gr.root_parser__), trace_history)
st = gr('AB_D')
print(st.errors)
resume_notices_on(gr)
_ = gr('AB_D')
for record in gr.history__:
if record.status.startswith(record.ERROR):
assert record.excerpt == '_D'
break
else:
assert False, "Missing Error!"
# log_parsing_history(gr, 'trace_skip_clause')
# get_history('trace_skip_clause')
def test_trace_resume(self):
gr = self.gr; gr.resume_rules = dict()
gr.resume_rules__['alpha'] = [re.compile(r'(?=BETA)')]
resume_notices_on(gr)
content = 'ALPHA acb BETA bac GAMMA cab .'
cst = gr(content)
assert cst.error_flag
assert cst.content == content
assert cst.pick('alpha').content.startswith('ALPHA')
# because of resuming, there should be only one error message
assert len([err for err in cst.errors_sorted if err.code >= 1000]) == 1
log_parsing_history(gr, 'trace_skip_clause')
get_history('trace_skip_clause')
if __name__ == "__main__":
from DHParser.testing import runner
runner("", globals())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment