Commit 7f34f386 authored by eckhart's avatar eckhart
Browse files

Bereinigung von Typenfehlern

parent 3478a3af
No preview for this file type
......@@ -41,7 +41,7 @@ from typing import Any, Optional, Tuple, List, Set, Union, Callable, cast
from DHParser.configuration import get_config_value
from DHParser.preprocess import with_source_mapping, PreprocessorFunc, SourceMapFunc
from DHParser.syntaxtree import Node, RootNode, ZOMBIE_TAG, StrictResultType
from DHParser.syntaxtree import Node, RootNode
from DHParser.transform import TransformationFunc
from DHParser.parse import Grammar
from DHParser.error import adjust_error_locations, is_error, is_fatal, Error
......@@ -411,5 +411,6 @@ def process_tree(tp: TreeProcessor, tree: RootNode) -> RootNode:
return tree
# TODO: Verify compiler against grammar, i.e. make sure that for all on_X()-methods, `X` is the name of a parser
# TODO: Verify compiler against grammar,
# i.e. make sure that for all on_X()-methods, `X` is the name of a parser
# TODO: AST validation against an ASDSL-Specification
......@@ -297,7 +297,7 @@ def load_compiler_suite(compiler_suite: str) -> \
RX_SECTION_MARKER.split(source)
except ValueError:
raise ValueError('File "' + compiler_suite + '" seems to be corrupted. '
'Please delete or repair file manually.')
'Please delete or repair file manually.')
# TODO: Compile in one step and pick parts from namespace later ?
preprocessor = compile_python_object(imports + preprocessor_py,
r'get_(?:\w+_)?preprocessor$')
......@@ -412,17 +412,17 @@ def compile_on_disk(source_file: str, compiler_suite="", extension=".xml") -> It
sfactory, pfactory, tfactory, cfactory = load_compiler_suite(compiler_suite)
compiler1 = cfactory()
else:
sfactory = get_ebnf_preprocessor # type: PreprocessorFactoryFunc
pfactory = get_ebnf_grammar # type: ParserFactoryFunc
tfactory = get_ebnf_transformer # type: TransformerFactoryFunc
cfactory = get_ebnf_compiler # type: CompilerFactoryFunc
compiler1 = cfactory() # type: Compiler
sfactory = get_ebnf_preprocessor # PreprocessorFactoryFunc
pfactory = get_ebnf_grammar # ParserFactoryFunc
tfactory = get_ebnf_transformer # TransformerFactoryFunc
cfactory = get_ebnf_compiler # CompilerFactoryFunc
compiler1 = cfactory() # Compiler
is_ebnf_compiler = False # type: bool
if isinstance(compiler1, EBNFCompiler):
is_ebnf_compiler = True
compiler1.set_grammar_name(compiler_name, source_file)
result, messages, _ = compile_source(source, sfactory(), pfactory(), tfactory(), compiler1)
if has_errors(messages):
......
......@@ -780,7 +780,8 @@ class EBNFCompiler(Compiler):
expression. Makes sure that multi-line regular expressions are
prepended by the multi-line-flag. Returns the regular expression string.
"""
# TODO: Support atomic grouping: https://stackoverflow.com/questions/13577372/do-python-regular-expressions-have-an-equivalent-to-rubys-atomic-grouping
# TODO: Support atomic grouping: https://stackoverflow.com/questions/13577372/
# do-python-regular-expressions-have-an-equivalent-to-rubys-atomic-grouping
flags = self.re_flags | {'x'} if rx.find('\n') >= 0 else self.re_flags
if flags:
rx = "(?%s)%s" % ("".join(flags), rx)
......
......@@ -63,7 +63,8 @@ from DHParser.stringview import StringView
from DHParser.syntaxtree import Node, FrozenNode, ZOMBIE_TAG, EMPTY_PTYPE
from DHParser.toolkit import escape_control_characters, abbreviate_middle
__all__ = ('start_logging',
__all__ = ('CallItem',
'start_logging',
'suspend_logging',
'resume_logging',
'log_dir',
......@@ -79,6 +80,9 @@ __all__ = ('start_logging',
'log_parsing_history')
CallItem = Tuple[str, int] # call stack item: (tag_name, location)
#######################################################################
#
# basic logging functionality
......@@ -247,9 +251,9 @@ NONE_TAG = ":None"
NONE_NODE = FrozenNode(NONE_TAG, '')
def freeze_callstack(call_stack: List[Tuple[str, int]]) -> Tuple[Tuple[str, int], ...]:
def freeze_callstack(call_stack: List[CallItem]) -> Tuple[CallItem, ...]:
"""Returns a frozen copy of the call stack."""
return tuple((tn, pos) for tn, pos in call_stack if tn != ":Forward")
return tuple((tn, pos) for tn, pos in call_stack if tn != ":Forward")
class HistoryRecord:
......@@ -298,15 +302,15 @@ class HistoryRecord:
'</style>\n</head>\n<body>\n')
HTML_LEAD_OUT = '\n</body>\n</html>\n'
def __init__(self, call_stack: Union[List[Tuple[str, int]], Tuple[Tuple[str, int], ...]],
def __init__(self, call_stack: Union[List[CallItem], Tuple[CallItem, ...]],
node: Optional[Node],
text: StringView,
line_col: Tuple[int, int],
errors: List[Error] = []) -> None:
# copy call stack, dropping uninformative Forward-Parsers
# self.call_stack = call_stack # type: Tuple[Tuple[str, int],...]
# self.call_stack = call_stack # type: Tuple[CallItem,...]
if isinstance(call_stack, tuple):
self.call_stack = call_stack # type: Tuple[Tuple[str, int],...]
self.call_stack = call_stack # type: Tuple[CallItem,...]
else:
self.call_stack = freeze_callstack(call_stack)
self.node = NONE_NODE if node is None else node # type: Node
......
......@@ -37,7 +37,7 @@ from typing import Callable, cast, List, Tuple, Set, Dict, \
from DHParser.configuration import get_config_value
from DHParser.error import Error, linebreaks, line_col
from DHParser.log import HistoryRecord
from DHParser.log import CallItem, HistoryRecord
from DHParser.preprocess import BEGIN_TOKEN, END_TOKEN, RX_TOKEN_NAME
from DHParser.stringview import StringView, EMPTY_STRING_VIEW
from DHParser.syntaxtree import ChildrenType, Node, RootNode, WHITESPACE_PTYPE, \
......@@ -117,6 +117,7 @@ class ParserError(Exception):
self.rest = rest # type: StringView
self.error = error # type: Error
self.first_throw = first_throw # type: bool
self.frozen_callstack = tuple() # type: Tuple[CallItem, ...] # tag_name, location
def __str__(self):
return "%i: %s %s" % (self.node.pos, str(self.rest[:25]), repr(self.node))
......@@ -511,8 +512,8 @@ class Parser:
if proxy is None:
self._parse_proxy = self._parse
else:
if type(proxy) != type(self._parse):
# assume that proxy is a function
if not isinstance(proxy, type(self._parse)):
# assume that proxy is a function and bind it to self
proxy = proxy.__get__(self, type(self))
else:
# if proxy is a method it must be a method of self
......@@ -536,8 +537,8 @@ class Parser:
self._grammar = grammar
# self._grammar_assigned_notifier()
elif self._grammar != grammar:
raise AssertionError("Parser has already been assigned"
"to a different Grammar object!")
raise AssertionError("Parser has already been assigned"
"to a different Grammar object!")
except AttributeError:
pass # ignore setting of grammar attribute for placeholder parser
except NameError: # Cython: No access to GRAMMA_PLACEHOLDER, yet :-(
......@@ -549,8 +550,8 @@ class Parser:
"""
return tuple()
def _apply(self, func: Callable[['Parser'], None],
flip: Callable[[Callable, Set[Callable]], bool]) -> bool:
def _apply(self, func: Callable[['Parser'], None],
flip: Callable[[Callable, Set[Callable]], bool]) -> bool:
"""
Applies function `func(parser)` recursively to this parser and all
descendant parsers, if any exist.
......@@ -622,6 +623,11 @@ def Drop(parser: Parser) -> Parser:
PARSER_PLACEHOLDER = Parser()
def is_parser_placeholder(parser: Optional[Parser]) -> bool:
"""Returns True, if `parser` is `None` or merely a placeholder for a parser."""
return not parser or parser.ptype == ":Parser"
########################################################################
#
# Grammar class, central administration of all parser of a grammar
......@@ -1070,7 +1076,7 @@ class Grammar:
self.rollback__ = [] # type: List[Tuple[int, Callable]]
self.last_rb__loc__ = -1 # type: int
# support for call stack tracing
self.call_stack__ = [] # type: List[Tuple[str, int]] # tag_name, location
self.call_stack__ = [] # type: List[CallItem] # tag_name, location
# snapshots of call stacks
self.history__ = [] # type: List[HistoryRecord]
# also needed for call stack tracing
......@@ -1990,7 +1996,6 @@ class Series(MandatoryNary):
text_, isinstance(parser, Lookahead), parser.repr, reloc)
# check if parsing of the series can be resumed somewhere
if reloc >= 0:
rest = text_
nd, text_ = parser(text_) # try current parser again
if nd is not None:
results += (node,)
......@@ -2017,7 +2022,7 @@ class Series(MandatoryNary):
# `RE('\d+') + Optional(RE('\.\d+)` instead of `Series(RE('\d+'), Optional(RE('\.\d+))`
@staticmethod
def combined_mandatory(left: 'Series', right: 'Series'):
def combined_mandatory(left: Parser, right: Parser) -> int:
"""
Returns the position of the first mandatory element (if any) when
parsers `left` and `right` are joined to a sequence.
......@@ -2630,7 +2635,7 @@ class Forward(Parser):
def __init__(self):
super(Forward, self).__init__()
self.parser = None # type: Optional[Parser]
self.parser = PARSER_PLACEHOLDER # type: Parser
self.cycle_reached = False
def __deepcopy__(self, memo):
......@@ -2641,8 +2646,7 @@ class Forward(Parser):
memo[id(self)] = duplicate
parser = copy.deepcopy(self.parser, memo)
duplicate.parser = parser
if parser is not None:
duplicate.drop_content = parser.drop_content
duplicate.drop_content = parser.drop_content
return duplicate
def __call__(self, text: StringView) -> Tuple[Optional[Node], StringView]:
......@@ -2696,6 +2700,6 @@ class Forward(Parser):
self.drop_content = parser.drop_content
def sub_parsers(self) -> Tuple[Parser, ...]:
if self.parser is not None:
return (self.parser,)
return tuple()
if is_parser_placeholder(self.parser):
return tuple()
return (self.parser,)
......@@ -42,7 +42,7 @@ For JSON see:
"""
import asyncio
from concurrent.futures import Future, Executor, ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from functools import partial
import json
......@@ -53,7 +53,7 @@ import subprocess
import sys
import time
from typing import Callable, Coroutine, Optional, Union, Dict, List, Tuple, Sequence, Set, \
Iterator, Iterable, Any, cast
Iterator, Iterable, Any, cast, Type
from DHParser.configuration import access_thread_locals, get_config_value
from DHParser.syntaxtree import DHParser_JSONEncoder
......@@ -66,6 +66,7 @@ __all__ = ('RPC_Table',
'RPC_Type',
'RPC_Error_Type',
'JSON_Type',
'JSON_Dict',
'ConnectionCallback',
'SERVER_ERROR',
'SERVER_OFFLINE',
......@@ -95,6 +96,7 @@ RPC_Table = Dict[str, Callable]
RPC_Type = Union[RPC_Table, List[Callable], Callable]
RPC_Error_Type = Optional[Tuple[int, str]]
JSON_Type = Union[Dict, Sequence, str, int, None]
JSON_Dict = Dict[str, JSON_Type]
BytesType = Union[bytes, bytearray]
ConnectionCallback = Callable[['Connection'], None]
......@@ -131,7 +133,7 @@ X-Pad: avoid browser bug
'''
JSONRPC_HEADER = '''Content-Length: {length}\r\n\r\n'''
JSONRPC_HEADER = b'''Content-Length: %i\r\n\r\n'''
ONELINER_HTML = '''<!DOCTYPE html>
<html lang="en" xml:lang="en">
......@@ -228,7 +230,7 @@ async def asyncio_connect(
delay = retry_timeout / 1.5**12 if retry_timeout > 0.0 else retry_timeout - 0.001
connected = False
reader, writer = None, None
save_error = ConnectionError
save_error = ConnectionError # type: Union[Type[ConnectionError], ConnectionRefusedError]
while delay < retry_timeout:
try:
reader, writer = await asyncio.open_connection(host, port)
......@@ -367,7 +369,7 @@ class Connection:
# async-task support
def create_task(self, json_id: int, coroutine: Coroutine) -> Future:
def create_task(self, json_id: int, coroutine: Coroutine) -> asyncio.futures.Future:
assert json_id not in self.active_tasks, \
"JSON-id {} already used!".format(json_id)
task = asyncio.ensure_future(coroutine)
......@@ -399,6 +401,7 @@ class Connection:
coroutine."""
if self.log_file:
self.log('RESULT: ', json.dumps(json_obj))
assert self.response_queue is not None
self.response_queue.put_nowait(json_obj)
async def server_call(self, json_obj: JSON_Type):
......@@ -407,7 +410,7 @@ class Connection:
if self.log_file:
self.log('CALL: ', json_str, '\n\n')
request = json_str.encode()
request = JSONRPC_HEADER.format(length=len(request)) + request
request = JSONRPC_HEADER % len(request) + request
self.writer.write(request)
await self.writer.drain()
......@@ -416,6 +419,7 @@ class Connection:
with the id `call_id`."""
pending = self.pending_responses.get(call_id, [])
while not pending:
assert self.response_queue is not None
response = await self.response_queue.get()
self.pending_responses.setdefault(call_id, []).insert(0, response)
pending = self.pending_responses.get(call_id, [])
......@@ -684,7 +688,7 @@ class Server:
has_kw_params = isinstance(params, Dict)
if not (has_kw_params or isinstance(params, Sequence)):
rpc_error = -32040, "Invalid parameter type %s for %s. Must be Dict or Sequence" \
% (str(type(params)), str(params))
% (str(type(params)), str(params))
return result, rpc_error
try:
# print(executor, method, params)
......@@ -694,8 +698,11 @@ class Server:
else:
result = method(**params) if has_kw_params else method(*params)
elif has_kw_params:
# if self.loop is None: raise AssertionError('No task loop!?')
assert self.loop, "No task loop!?"
result = await self.loop.run_in_executor(executor, partial(method, **params))
else:
assert self.loop, "No task loop!?"
result = await self.loop.run_in_executor(executor, partial(method, *params))
except TypeError as e:
rpc_error = -32602, "Invalid Params: " + str(e)
......@@ -725,6 +732,7 @@ class Server:
result, rpc_error = await self.execute(executor, method, params)
if rpc_error is not None and rpc_error[0] == -32050:
# if process pool is broken, try again:
assert self.process_executor
self.process_executor.shutdown(wait=True)
self.process_executor = ProcessPoolExecutor()
result, rpc_error = await self.execute(self.process_executor, method, params)
......@@ -743,7 +751,7 @@ class Server:
'Only bytes and str allowed!'
% (str(type(response)), str(response))).encode()
if self.use_jsonrpc_header and response.startswith(b'{'):
response = JSONRPC_HEADER.format(length=len(response)).encode() + response
response = JSONRPC_HEADER % len(response) + response
if self.log_file: # avoid data decoding if logging is off
self.log('RESPONSE: ', response.decode(), '\n\n')
# print('returned: ', response)
......@@ -752,10 +760,11 @@ class Server:
await writer.drain()
except ConnectionError as err:
self.log('ERROR when writing data: ', str(err), '\n')
self.connection.alive = False
if self.connection:
self.connection.alive = False
def amend_service_call(self, func_name: str, func: Callable, argument: Union[Tuple, Dict],
err_func: Callable)-> Tuple[Callable, Union[Tuple, Dict]]:
err_func: Callable) -> Tuple[Callable, Union[Tuple, Dict]]:
if argument is None:
argument = ()
if getattr(func, '__self__', None) == self:
......@@ -787,7 +796,8 @@ class Server:
func_name = m.group(1).decode()
argstr = m.group(2).decode()
if argstr:
argument = tuple(convert_argstr(s) for s in argstr.split(','))
argument = tuple(convert_argstr(s)
for s in argstr.split(',')) # type: Union[Tuple, Dict]
else:
argument = ()
else:
......@@ -796,7 +806,7 @@ class Server:
err_func = lambda *args, **kwargs: \
'No function named "%s" known to server %s !' % (func_name, self.server_name)
func = self.rpc_table.get(func_name, err_func)
func = self.rpc_table.get(func_name, err_func) # type: Callable
if service_call:
func, argument = self.amend_service_call(func_name, func, argument, err_func)
result, rpc_error = await self.run(func_name, func, argument)
......@@ -810,6 +820,7 @@ class Server:
await self.respond(writer, str(err))
else:
await self.respond(writer, rpc_error[1])
assert self.connection
self.connection.task_done(task_id)
async def handle_http_request(self, task_id: int,
......@@ -850,6 +861,7 @@ class Server:
await self.respond(writer, http_response(str(err)))
else:
await self.respond(writer, http_response(rpc_error[1]))
assert self.connection
self.connection.task_done(task_id)
async def handle_jsonrpc_request(self, json_id: int,
......@@ -877,11 +889,13 @@ class Server:
method = self.rpc_table[method_name]
params = json_obj['params'] if 'params' in json_obj else {}
if service_call:
err_func = lambda *args, **kwargs: {"error": {"code": -32601,
"message": "%s is not a service function" % method_name}}
err_func = lambda *args, **kwargs: {
"error": {"code": -32601,
"message": "%s is not a service function" % method_name}}
method, params = self.amend_service_call(method_name, method, params, err_func)
result, rpc_error = await self.run(method_name, method, params)
if method_name == 'exit':
assert self.connection
self.connection.alive = False
reader.feed_eof()
......@@ -917,6 +931,7 @@ class Server:
if result is not None or rpc_error is not None:
await writer.drain()
assert self.connection
self.connection.task_done(json_id)
async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
......@@ -931,12 +946,13 @@ class Server:
def connection_alive() -> bool:
"""-> `False` if connection is dead or shall be shut down."""
assert self.connection
return not self.kill_switch and self.connection.alive and not reader.at_eof()
buffer = bytearray() # type: bytearray
while connection_alive():
# reset the data variable
data = bytearray() # type: bytearray
data = bytearray() # type: bytearray
# reset the content length
content_length = 0 # type: int
# reset the length of the header, represented by the variable `k`
......@@ -1045,7 +1061,7 @@ class Server:
# (put calls to execute in asyncio tasks, use asyncio.gather)
json_id = 0 # type: int
raw = None # type: Optional[JSON_Type]
json_obj = {} # type: JSON_Type
json_obj = {} # type: JSON_Dict
rpc_error = None # type: Optional[RPC_Error_Type]
# see: https://microsoft.github.io/language-server-protocol/specification#header-part
# i = max(data.find(b'\n\n'), data.find(b'\r\n\r\n')) + 2
......@@ -1066,8 +1082,9 @@ class Server:
if rpc_error is None:
if isinstance(raw, Dict):
json_obj = cast(Dict, raw)
json_id = json_obj.get('id', gen_task_id())
json_obj = cast(JSON_Dict, raw)
raw_id = cast(Union[str, int], json_obj.get('id', gen_task_id()))
json_id = int(raw_id)
else:
rpc_error = -32700, 'Parse error: JSON-package does not appear '\
'to ba an RPC-call or -response!?'
......@@ -1076,13 +1093,14 @@ class Server:
method = json_obj.get('method', '')
response = json_obj.get('result', None) or json_obj.get('error', None)
if method:
assert isinstance(method, str)
if id_connection or method != 'initialize':
rpc_error = self.connection.verify_initialization(
method, self.strict_lsp and id_connection)
method, self.strict_lsp and bool(id_connection))
if rpc_error is None:
task = self.connection.create_task(
json_id, self.handle_jsonrpc_request(
json_id, reader, writer, json_obj, not id_connection))
json_id, reader, writer, json_obj, not bool(id_connection)))
else:
rpc_error = -32002, 'server is already connected to another client'
elif response is not None:
......@@ -1128,12 +1146,13 @@ class Server:
self.stage.value = SERVER_TERMINATING
if sys.version_info >= (3, 7):
await writer.wait_closed()
assert self.serving_task
self.serving_task.cancel()
else:
self.server.close() # break self.server.serve_forever()
if sys.version_info < (3, 7) and self.loop is not None:
self.loop.stop()
self.log('SERVER MESSAGE: Stopping server: %i.\n\n'.format(id_connection))
self.log('SERVER MESSAGE: Stopping server: {}.\n\n'.format(id_connection))
self.kill_switch = False # reset flag
async def serve(self, host: str = USE_DEFAULT_HOST, port: int = USE_DEFAULT_PORT):
......@@ -1184,6 +1203,7 @@ class Server:
asyncio.set_event_loop(self.loop)
else:
self.loop = loop
assert self.loop is not None
self.server = cast(
asyncio.base_events.Server,
self.loop.run_until_complete(
......@@ -1399,7 +1419,7 @@ def stop_server(host: str = USE_DEFAULT_HOST, port: int = USE_DEFAULT_PORT,
#######################################################################
def lsp_candidates(cls: Any, prefix: str='lsp_') -> Iterator[str]:
def lsp_candidates(cls: Any, prefix: str = 'lsp_') -> Iterator[str]:
"""Returns an iterator over all method names from a class that either
have a certain prefix or, if no prefix was given, all non-special and
non-private-methods of the class."""
......@@ -1416,7 +1436,7 @@ def lsp_candidates(cls: Any, prefix: str='lsp_') -> Iterator[str]:
yield fn
def gen_lsp_name(func_name: str, prefix: str= 'lsp_') -> str:
def gen_lsp_name(func_name: str, prefix: str = 'lsp_') -> str:
"""Generates the name of an lsp-method from a function name,
e.g. "lsp_S_cacelRequest" -> "$/cancelRequest" """
assert func_name.startswith(prefix)
......@@ -1456,4 +1476,3 @@ def gen_lsp_table(lsp_funcs_or_instance: Union[Iterable[Callable], Any],
cls = lsp_funcs_or_instance
rpc_table = {gen_lsp_name(fn, prefix): getattr(cls, fn) for fn in lsp_candidates(cls, prefix)}
return rpc_table
......@@ -32,7 +32,7 @@ speedup. The modules comes with a ``stringview.pxd`` that contains some type
declarations to more fully exploit the benefits of the Cython-compiler.
"""
from typing import Optional, Union, Iterable, Tuple, List
from typing import Optional, Union, Iterable, Tuple, List, Sequence
try:
import cython
......@@ -340,7 +340,7 @@ class StringView: # collections.abc.Sized
return self if end == self._len else self[:end]
@cython.locals(length=cython.int, i=cython.int, k=cython.int)
def split(self, sep=None) -> List[Union['StringView', str]]:
def split(self, sep=None) -> Sequence[Union['StringView', str]]:
"""Returns a list of the words in `self`, using `sep` as the
delimiter string. If `sep` is not specified or is None, any
whitespace string is a separator and empty strings are
......
......@@ -258,8 +258,8 @@ class Node: # (collections.abc.Sized): Base class omitted for cython-compatibil
__slots__ = '_result', 'children', '_pos', 'tag_name', '_xml_attr'
def __init__(self, tag_name: str,
result: Union[Tuple['Node', ...], 'Node', StringView, str],
def __init__(self, tag_name: str,
result: Union[Tuple['Node', ...], 'Node', StringView, str],
leafhint: bool = False) -> None:
"""
Initializes the ``Node``-object with the ``Parser``-Instance
......@@ -367,7 +367,7 @@ class Node: # (collections.abc.Sized): Base class omitted for cython-compatibil
:param result: the new result of the note
"""
if isinstance(result, Node):
self.children = (result,) # type: Tuple[Node, ...]
self.children = (result,)
self._result = self.children
else:
if isinstance(result, tuple):
......@@ -1569,7 +1569,8 @@ def parse_sxpr(sxpr: Union[str, StringView]) -> Node:
attributes[attr] = str(value)
sxpr = sxpr[k + 1:].strip()
if sxpr[0] == '(':
result = tuple(inner_parser(block) for block in next_block(sxpr)) # type: Union[Tuple[Node, ...], str]
result = tuple(inner_parser(block)
for block in next_block(sxpr)) # type: Union[Tuple[Node, ...], str]
else:
lines = []
while sxpr and sxpr[0:1] != ')':
......@@ -1688,7 +1689,7 @@ def parse_xml(xml: Union[str, StringView], ignore_pos: bool = False) -> Node:
if len(res) == 1 and res[0].tag_name == TOKEN_PTYPE:
result = res[0].result # type: Union[Tuple[Node, ...], StringView, str]
else:
result = tuple(res) # type:Union[Tuple[Node, ...], StringView, str]
result = tuple(res)
node = Node(name or ':' + class_name, result)
if not ignore_pos and '_pos' in attrs:
......
......@@ -370,7 +370,7 @@ def compile_python_object(python_src: str, catch_obj_regex="") -> Any:
if isinstance(catch_obj_regex, str):
catch_obj_regex = re.compile(catch_obj_regex)
code = compile(python_src, '<string>', 'exec')
namespace = {}
namespace = {} # type: Dict[str, Any]
exec(code, namespace) # safety risk?
if catch_obj_regex:
matches = [key for key in namespace if catch_obj_regex.match(key)]
......
......@@ -48,9 +48,9 @@ def trace_history(self: Parser, text: StringView) -> Tuple[Optional[Node], Strin
errors = [pe.error]
# ignore inflated length due to gap jumping (see parse.Parser.__call__)
l = sum(len(nd) for nd in pe.node.select_if(lambda n: True, include_root=True)
if not nd.children and nd.tag_name != ZOMBIE_TAG)
text_ = pe.rest[l:]
loc = sum(len(nd) for nd in pe.node.select_if(lambda n: True, include_root=True)
if not nd.children and nd.tag_name != ZOMBIE_TAG)
text_ = pe.rest[loc:]
lc = line_col(grammar.document_lbreaks__, pe.error.pos)
target = text
if len(target) >= 10:
......@@ -83,8 +83,8 @@ def trace_history(self: Parser, text: StringView) -> Tuple[Optional[Node], Strin
if pe.first_throw:
pe.frozen_callstack = freeze_callstack(grammar.call_stack__)
grammar.most_recent_error__ = pe
if self == grammar.start_parser__:
fe = grammar.most_recent_error__
if self == grammar.start_parser__ and grammar.most_recent_error__:
fe = grammar.most_recent_error__ # type: ParserError
lc = line_col(grammar.document_lbreaks__, fe.error.pos)
# TODO: get the call stack from when the error occured, here
nd = fe.node
......@@ -101,9 +101,9 @@ def trace_history(self: Parser, text: StringView) -> Tuple[Optional[Node], Strin
# record history
# TODO: Make dropping insignificant whitespace from history configurable
delta = text._len - rest._len
nd = Node(node.tag_name, text[:delta]).with_pos(location) if node else None
hnd = Node(node.tag_name, text[:delta]).with_pos(location) if node else None
lc = line_col(grammar.document_lbreaks__, location)
record = HistoryRecord(grammar.call_stack__, nd, rest, lc, [])
record = HistoryRecord(grammar.call_stack__, hnd, rest, lc, [])
cs_len = len(record.call_stack)
if (not grammar.history__ or lc != grammar.history__[-1].line_col
or record.call_stack != grammar.history__[-1].call_stack[:cs_len]
......