The expiration time for new job artifacts in CI/CD pipelines is now 30 days (GitLab default). Previously generated artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 6f9992cd authored by di68kap's avatar di68kap
Browse files

preprocess.py: Refactoring

parent 1a2e62db
......@@ -41,8 +41,7 @@ import traceback
from typing import Any, Optional, Tuple, List, Set, Union, Callable, cast
from DHParser.configuration import get_config_value
from DHParser.preprocess import with_source_mapping, PreprocessorFunc, SourceMapFunc, \
SourceLocation
from DHParser.preprocess import PreprocessorFunc
from DHParser.syntaxtree import Node, RootNode, EMPTY_PTYPE, TreeContext
from DHParser.transform import TransformationFunc
from DHParser.parse import Grammar
......@@ -361,7 +360,7 @@ def compile_source(source: str,
source_mapping = gen_neutral_srcmap_func(source_text, source_name)
# lambda i: SourceLocation(source_name, 0, i) # type: SourceMapFunc
else:
source_text, source_mapping = with_source_mapping(preprocessor(original_text, source_name))
_, source_text, source_mapping = preprocessor(original_text, source_name)
# parsing
......
......@@ -1057,7 +1057,7 @@ from DHParser import start_logging, suspend_logging, resume_logging, is_filename
remove_anonymous_empty, keep_nodes, traverse_locally, strip, lstrip, rstrip, \\
transform_content, replace_content_with, forbid, assert_content, remove_infix_operator, \\
add_error, error_on, recompile_grammar, left_associative, lean_left, set_config_value, \\
get_config_value, node_maker, access_thread_locals, access_presets, \\
get_config_value, node_maker, access_thread_locals, access_presets, PreprocessorResult, \\
finalize_presets, ErrorCode, RX_NEVER_MATCH, set_tracer, resume_notices_on, \\
trace_history, has_descendant, neg, has_ancestor, optional_last_value, insert, \\
positions_of, replace_tag_names, add_attributes, delimit_children, merge_connected, \\
......@@ -2262,9 +2262,12 @@ class EBNFCompiler(Compiler):
the previously compiled formal language.
"""
name = self.grammar_name + "Preprocessor"
return "def nop(pos, source_name):\n return SourceLocation(source_name, pos)\n\n\n" \
return "def nop(pos, source_name, source_text):\n"\
" return SourceLocation(source_name, source_text, pos)\n\n\n" \
"def %s(source_text, source_name):\n"\
" return source_text, partial(nop, source_name)\n" % name \
" return PreprocessorResult(\n"\
" source_text, source_text,\n"\
" partial(nop, source_name=source_name, source_text=source_text))\n" % name \
+ PREPROCESSOR_FACTORY.format(NAME=self.grammar_name)
......
......@@ -388,9 +388,11 @@ def add_source_locations(errors: List[Error], source_mapping: SourceMapFunc):
source_mapping: A function that maps error positions to their
positions in the original source file.
"""
lb_dict = {}
for err in errors:
assert err.pos >= 0
err.orig_doc, lbreaks, err.orig_pos = source_mapping(err.pos)
err.orig_doc, orig_text, err.orig_pos = source_mapping(err.pos)
lbreaks = lb_dict.setdefault(orig_text, linebreaks(orig_text))
err.line, err.column = line_col(lbreaks, err.orig_pos)
if err.orig_pos + err.length > lbreaks[-1]:
err.length = lbreaks[-1] - err.orig_pos # err.length should not exceed text length
......
......@@ -34,7 +34,7 @@ import os
from typing import Union, Optional, Callable, Tuple, NamedTuple, List, Dict, Any
from DHParser.stringview import StringView
from DHParser.toolkit import re, linebreaks
from DHParser.toolkit import re
__all__ = ('RX_TOKEN_NAME',
......@@ -44,8 +44,8 @@ __all__ = ('RX_TOKEN_NAME',
'SourceMap',
'SourceMapFunc',
'PreprocessorFunc',
'Preprocessed',
'PreprocessorResult',
'Tokenizer',
'make_token',
'strip_tokens',
'nil_preprocessor',
......@@ -53,8 +53,7 @@ __all__ = ('RX_TOKEN_NAME',
'prettyprint_tokenized',
'gen_neutral_srcmap_func',
'tokenized_to_original_mapping',
# 'source_map',
'with_source_mapping',
'make_preprocessor',
'gen_find_include_func',
'preprocess_includes')
......@@ -82,37 +81,39 @@ class IncludeInfo(NamedTuple):
class SourceMap(NamedTuple):
source_name: str # nome or path or uri of the original source file
original_name: str # nome or path or uri of the original source file
positions: List[int] # a list of locations
offsets: List[int] # the corresponding offsets to be added from these locations onward
file_names: List[str] # list of file_names to which the source locations relate
lbreaks_dict: Dict[str, List[int]] # line breaks of the included texts
originals_dict: Dict[str, Union[str, StringView]] # File names => (included) source texts
def has_includes(sm: SourceMap) -> bool:
return any(fname != sm.source_name for fname in sm.file_names)
return any(fname != sm.original_name for fname in sm.file_names)
class SourceLocation(NamedTuple):
source_name: str # the file name (or path or uri) of the source code
lbreaks: List[int] # positions of the line-breaks in the source file
pos: int # a position within this file
original_name: str # the file name (or path or uri) of the source code
original_text: Union[str, StringView] # the source code itself
pos: int # a position within the code
SourceMapFunc = Union[Callable[[int], SourceLocation],
functools.partial]
class Preprocessed(NamedTuple):
preprocessed_text: str
class PreprocessorResult(NamedTuple):
original_text: Union[str, StringView]
preprocessed_text: Union[str, StringView]
back_mapping: SourceMapFunc
PreprocessorResult = Union[str, Preprocessed]
FindIncludeFunc = Union[Callable[[str, int], IncludeInfo], # (document: str, start: int)
functools.partial]
PreprocessorFunc = Union[Callable[[str, str], PreprocessorResult], # text: str, filename: str
functools.partial]
Tokenizer = Union[Callable[[str], str], functools.partial]
# a functions that merely adds preprocessor tokens to a source text
#######################################################################
......@@ -122,12 +123,13 @@ PreprocessorFunc = Union[Callable[[str, str], PreprocessorResult], # text: str,
#######################################################################
def nil_preprocessor(source_text: str, source_name: str) -> Preprocessed:
def nil_preprocessor(original_text: str, original_name: str) -> PreprocessorResult:
"""
A preprocessor that does nothing, i.e. just returns the input.
"""
lbreaks = linebreaks(source_text)
return Preprocessed(source_text, lambda i: SourceLocation(source_name, lbreaks, i))
return PreprocessorResult(original_text,
original_text,
lambda i: SourceLocation(original_name, original_text, i))
def _apply_mappings(position: int, mappings: List[SourceMapFunc]) -> SourceLocation:
......@@ -137,28 +139,30 @@ def _apply_mappings(position: int, mappings: List[SourceMapFunc]) -> SourceLocat
position within a preprocessed source text and mappings should therefore
be a list of reverse-mappings in reversed order.
"""
filename, lbreaks = '', []
filename, text = '', ''
for mapping in mappings:
filename, lbreaks, position = mapping(position)
return SourceLocation(filename, lbreaks, position)
filename, text, position = mapping(position)
return SourceLocation(filename, text, position)
def _apply_preprocessors(source_text: str, source_name: str,
def _apply_preprocessors(original_text: str, original_name: str,
preprocessors: Tuple[PreprocessorFunc, ...]) \
-> Preprocessed:
-> PreprocessorResult:
"""
Applies several preprocessing functions sequentially to a source text
and returns the preprocessed text as well as a function that maps text-
positions in the processed text onto the corresponding position in the
original source test.
"""
processed = source_text
processed = original_text
mapping_chain = []
for prep in preprocessors:
processed, mapping_func = with_source_mapping(prep(processed, source_name))
_, processed, mapping_func = prep(processed, original_name)
mapping_chain.append(mapping_func)
mapping_chain.reverse()
return Preprocessed(processed, functools.partial(_apply_mappings, mappings=mapping_chain))
return PreprocessorResult(original_text,
processed,
functools.partial(_apply_mappings, mappings=mapping_chain))
def chain_preprocessors(*preprocessors) -> PreprocessorFunc:
......@@ -225,20 +229,22 @@ def strip_tokens(tokenized: str) -> str:
#######################################################################
def gen_neutral_srcmap_func(source_text: Union[StringView, str], source_name: str='') -> SourceMapFunc:
def gen_neutral_srcmap_func(original_text: Union[StringView, str], original_name: str = '') -> SourceMapFunc:
"""Generates a source map functions that maps positions to itself."""
line_breaks = linebreaks(source_text or ' ')
if not source_name: source_name = 'UNKNOWN_FILE'
return lambda pos: SourceLocation(source_name, line_breaks, pos)
if not original_name: original_name = 'UNKNOWN_FILE'
return lambda pos: SourceLocation(original_name, original_text, pos)
def tokenized_to_original_mapping(tokenized_text: str, source_name: str='UNKNOWN_FILE') -> SourceMap:
def tokenized_to_original_mapping(tokenized_text: str,
original_text: str,
original_name: str = 'UNKNOWN_FILE') -> SourceMap:
"""
Generates a source map for mapping positions in a text that has
been enriched with token markers to their original positions.
:param tokenized_text: the source text enriched with token markers.
:poram source_name: the name or path or uri of the original source file.
:param original_text: the original source text
:param original_name: the name or path or uri of the original source file.
:returns: a source map, i.e. a list of positions and a list of corresponding
offsets. The list of positions is ordered from smallest to highest.
An offset is valid for its associated position and all following
......@@ -269,9 +275,9 @@ def tokenized_to_original_mapping(tokenized_text: str, source_name: str='UNKNOWN
# specific condition for preprocessor tokens
assert all(offsets[i] > offsets[i + 1] for i in range(len(offsets) - 2))
lbreaks = linebreaks(tokenized_text)
L = len(positions)
return SourceMap(source_name, positions, offsets, [source_name] * L, { source_name: lbreaks})
return SourceMap(
original_name, positions, offsets, [original_name] * L, {original_name: original_text})
def source_map(position: int, srcmap: SourceMap) -> SourceLocation:
......@@ -286,43 +292,25 @@ def source_map(position: int, srcmap: SourceMap) -> SourceLocation:
"""
i = bisect.bisect_right(srcmap.positions, position)
if i:
source_name = srcmap.file_names[i - 1]
original_name = srcmap.file_names[i - 1]
return SourceLocation(
source_name,
srcmap.lbreaks_dict[source_name],
original_name,
srcmap.originals_dict[original_name],
min(position + srcmap.offsets[i - 1], srcmap.positions[i] + srcmap.offsets[i]))
raise ValueError
def with_source_mapping(result: PreprocessorResult) -> Preprocessed:
"""
Normalizes preprocessors results, by adding a mapping if a preprocessor
only returns the transformed source code and no mapping by itself. It is
assumed that in this case the preprocessor has just enriched the source
code with tokens, so that a source mapping can be derived automatically
with :func:`tokenized_to_original_mapping` (see above).
:param result: Either a preprocessed text as atring containing
preprocessor tokens, or a tuple of a preprocessed text AND a source
mapping function. In the former case the source mapping will be
generated, in the latter it will simply be passed through.
:returns: A tuple of the preprocessed text and the source-mapping function
that returns the original text position when called with a position
in the preprocessed text.
def make_preprocessor(tokenizer: Tokenizer) -> PreprocessorFunc:
"""Generates a preprocessor function from a "naive" tokenizer, i.e.
a function that merely adds preprocessor tokens to a source text and
returns the modified source.
"""
if isinstance(result, str):
srcmap = tokenized_to_original_mapping(result)
token_mapping = functools.partial(source_map, srcmap=srcmap)
return Preprocessed(result, token_mapping)
# else: # DOES NOT WORK, because there is no way to reliably find out whether
# # token back-mapping has already been done by the provided mapping
# text, mapping = cast(Preprocessed, result)
# if not (hasattr(mapping, 'func') and mapping.func == source_map):
# srcmap = tokenized_to_original_mapping(text)
# token_mapping = functools.partial(source_map, srcmap=srcmap)
# return Preprocessed(
# text, functools.partial(_apply_mappings, mappings=[token_mapping, mapping]))
return result
def preprocessor(original_text: str, original_name: str, *args) -> PreprocessorResult:
tokenized_text = tokenizer(original_text)
srcmap = tokenized_to_original_mapping(tokenized_text, original_text, original_name)
mapping = functools.partial(source_map, srcmap=srcmap)
return PreprocessorResult(original_text, tokenized_text, mapping)
return preprocessor
#######################################################################
......@@ -366,14 +354,14 @@ def gen_find_include_func(rx: Union[str, Any],
return find_include if comment_rx is None else meta_find_include
def generate_include_map(source_name: str,
source_text: str,
def generate_include_map(original_name: str,
original_text: str,
find_next_include: FindIncludeFunc) -> Tuple[SourceMap, str]:
file_names: set = set()
def generate_map(source_name, source_text, find_next) -> Tuple[SourceMap, str]:
nonlocal file_names
map = SourceMap(source_name, [0], [0], [source_name], {source_name: linebreaks(source_text)})
map = SourceMap(source_name, [0], [0], [source_name], {source_name: source_text})
result = []
if source_name in file_names:
......@@ -381,16 +369,16 @@ def generate_include_map(source_name: str,
file_names.add(source_name)
dirname = os.path.dirname(source_name)
source_pointer = 0
source_offset = 0
original_pointer = 0
original_offset = 0
result_pointer = 0
last_begin = -1
begin, length, include_name = find_next(source_text, 0)
include_name = os.path.join(dirname, include_name)
while begin >= 0:
assert begin > last_begin
source_delta = begin - source_pointer
source_pointer += source_delta
source_delta = begin - original_pointer
original_pointer += source_delta
result_pointer += source_delta
with open(include_name, 'r', encoding='utf-8') as f:
included_text = f.read()
......@@ -404,32 +392,32 @@ def generate_include_map(source_name: str,
map.positions.pop()
map.offsets.pop()
else:
result.append(source_text[source_pointer - source_delta: source_pointer])
result.append(source_text[original_pointer - source_delta: original_pointer])
map.file_names.extend(inner_map.file_names[:-1])
map.positions.extend(inner_map.positions[:-1])
map.offsets.extend(inner_map.offsets[:-1])
map.lbreaks_dict.update(inner_map.lbreaks_dict)
map.originals_dict.update(inner_map.originals_dict)
result.append(inner_text)
inner_length = len(inner_text)
result_pointer += inner_length
map.file_names.append(source_name)
map.positions.append(result_pointer)
source_pointer += length
source_offset += length - inner_length
map.offsets.append(source_offset)
begin, length, include_name = find_next(source_text, source_pointer)
original_pointer += length
original_offset += length - inner_length
map.offsets.append(original_offset)
begin, length, include_name = find_next(source_text, original_pointer)
include_name = os.path.join(dirname, include_name)
rest = source_text[source_pointer:]
rest = source_text[original_pointer:]
if rest:
result.append(rest)
map.positions.append(map.positions[-1] + len(rest))
map.offsets.append(source_offset)
map.offsets.append(original_offset)
map.file_names.append(source_name)
file_names.remove(source_name)
# map.file_offsets = [-offset for offset in map.offsets] # only for debugging!
return map, ''.join(result)
return generate_map(source_name, source_text, find_next_include)
return generate_map(original_name, original_text, find_next_include)
def srcmap_includes(position: int, inclmap: SourceMap) -> SourceLocation:
......@@ -438,19 +426,19 @@ def srcmap_includes(position: int, inclmap: SourceMap) -> SourceLocation:
source_name = inclmap.file_names[i - 1]
return SourceLocation(
source_name,
inclmap.lbreaks_dict[source_name],
inclmap.originals_dict[source_name],
position + inclmap.offsets[i - 1])
raise ValueError
def preprocess_includes(source_text: Optional[str],
source_name: str,
find_next_include: FindIncludeFunc) -> Preprocessed:
if not source_text:
with open(source_name, 'r', encoding='utf-8') as f:
source_text = f.read()
include_map, result = generate_include_map(source_name, source_text, find_next_include)
def preprocess_includes(original_text: Optional[str],
original_name: str,
find_next_include: FindIncludeFunc) -> PreprocessorResult:
if not original_text:
with open(original_name, 'r', encoding='utf-8') as f:
original_text = f.read()
include_map, result = generate_include_map(original_name, original_text, find_next_include)
mapping_func = functools.partial(srcmap_includes, inclmap=include_map)
return Preprocessed(result, mapping_func)
return PreprocessorResult(original_text, result, mapping_func)
......@@ -49,7 +49,7 @@ from DHParser import start_logging, suspend_logging, resume_logging, is_filename
positions_of, replace_tag_names, add_attributes, delimit_children, merge_connected, \
has_attr, has_parent, ThreadLocalSingletonFactory, Error, canonical_error_strings, \
has_errors, apply_unless, WARNING, ERROR, FATAL, EMPTY_NODE, TreeReduction, CombinedParser, \
Preprocessed, preprocess_includes, gen_find_include_func, flatten_sxpr, \
PreprocesserResult, preprocess_includes, gen_find_include_func, flatten_sxpr, \
replace_content_with
......@@ -63,7 +63,7 @@ from DHParser import start_logging, suspend_logging, resume_logging, is_filename
RX_TEX_INPUT = r'\\input{(?P<name>.*)}'
def LaTeXPreprocessor(text: str, file_name: str) -> Preprocessed:
def LaTeXPreprocessor(text: str, file_name: str) -> PreprocesserResult:
find_includes = gen_find_include_func(RX_TEX_INPUT, LaTeXGrammar.comment_rx__)
return preprocess_includes(text, file_name, find_includes)
......
......@@ -35,8 +35,8 @@ from DHParser.configuration import set_config_value
from DHParser.dsl import grammar_provider
from DHParser import compile_source
from DHParser.preprocess import make_token, tokenized_to_original_mapping, source_map, \
BEGIN_TOKEN, END_TOKEN, TOKEN_DELIMITER, SourceMapFunc, SourceMap, chain_preprocessors, \
strip_tokens, gen_find_include_func, preprocess_includes, IncludeInfo
BEGIN_TOKEN, END_TOKEN, TOKEN_DELIMITER, PreprocessorResult, SourceMap, chain_preprocessors, \
strip_tokens, gen_find_include_func, preprocess_includes, IncludeInfo, make_preprocessor
from DHParser.toolkit import lstrip_docstring, typing, re
from DHParser.testing import unique_name
from typing import Tuple, Dict
......@@ -69,7 +69,7 @@ class TestSourceMapping:
def test_tokenized_to_original_mapping(self):
srcmap = tokenized_to_original_mapping(self.tokenized)
srcmap = tokenized_to_original_mapping(self.tokenized, self.code)
positions, offsets = srcmap.positions, srcmap.offsets
assert len(positions) == len(offsets)
assert positions[0] == 0
......@@ -81,15 +81,15 @@ class TestSourceMapping:
def test_bondary_cases(self):
# position at the end of the file
source = " "
srcmap = tokenized_to_original_mapping(source)
srcmap = tokenized_to_original_mapping(source, source)
pos = source_map(1, srcmap)
# empty file
source =""
srcmap = tokenized_to_original_mapping(source)
srcmap = tokenized_to_original_mapping(source, source)
pos = source_map(0, srcmap)
def preprocess_indentation(src: str, src_name: str) -> str:
def tokenize_indentation(src: str) -> str:
transformed = []
indent_level = 0
for line in src.split('\n'):
......@@ -115,7 +115,10 @@ def preprocess_indentation(src: str, src_name: str) -> str:
return tokenized
def preprocess_comments(src: str, src_name: str) -> Tuple[str, SourceMapFunc]:
preprocess_indentation = make_preprocessor(tokenize_indentation)
def preprocess_comments(src: str, src_name: str) -> PreprocessorResult:
lines = src.split('\n')
positions, offsets = [0], [0]
pos = 0
......@@ -129,9 +132,12 @@ def preprocess_comments(src: str, src_name: str) -> Tuple[str, SourceMapFunc]:
pos += len(lines[i])
positions.append(pos)
offsets.append(offsets[-1])
return '\n'.join(lines), \
partial(source_map, srcmap=SourceMap(src_name, positions, offsets, [src_name]*len(positions),
{src_name: [-1, len(src)]}))
return PreprocessorResult(src, '\n'.join(lines),
partial(source_map, srcmap=SourceMap(src_name,
positions,
offsets,
[src_name] * len(positions),
{src_name: src})))
class TestTokenParsing:
......@@ -151,13 +157,13 @@ class TestTokenParsing:
print(x) # another comment
print(y)
""")
tokenized = preprocess_indentation(code, 'no_uri')
srcmap = tokenized_to_original_mapping(tokenized)
tokenized = tokenize_indentation(code)
srcmap = tokenized_to_original_mapping(tokenized, code)
def verify_mapping(self, teststr, orig_text, preprocessed_text, mapping):
mapped_pos = preprocessed_text.find(teststr)
assert mapped_pos >= 0
file_name, file_offset, original_pos = mapping(mapped_pos)
file_name, file_content, original_pos = mapping(mapped_pos)
# original_pos = source_map(mapped_pos, self.srcmap)
assert orig_text[original_pos:original_pos + len(teststr)] == teststr, \
'"%s" (%i) wrongly mapped onto "%s" (%i)' % \
......@@ -188,7 +194,7 @@ class TestTokenParsing:
previous_index = index
def test_non_token_preprocessor(self):
tokenized, mapping = preprocess_comments(self.code, 'no_uri')
_, tokenized, mapping = preprocess_comments(self.code, 'no_uri')
self.verify_mapping("def func", self.code, tokenized, mapping)
self.verify_mapping("x > 0:", self.code, tokenized, mapping)
self.verify_mapping("if y > 0:", self.code, tokenized, mapping)
......@@ -197,7 +203,7 @@ class TestTokenParsing:
def test_chained_preprocessors(self):
pchain = chain_preprocessors(preprocess_comments, preprocess_indentation)
tokenized, mapping = pchain(self.code, 'no_uri')
_, tokenized, mapping = pchain(self.code, 'no_uri')
self.verify_mapping("def func", self.code, tokenized, mapping)
self.verify_mapping("x > 0:", self.code, tokenized, mapping)
self.verify_mapping("if y > 0:", self.code, tokenized, mapping)
......@@ -277,7 +283,7 @@ class TestIncludes:
def perform(main, sub):
self.create_files({'main.txt': main, 'sub.txt': sub})
find_func = gen_find_include_func(r'include\((?P<name>[^)\n]*)\)')
text, mapping = preprocess_includes(None, 'main.txt', find_func)
_, text, mapping = preprocess_includes(None, 'main.txt', find_func)
# print(mapping)
assert text == main.replace('include(sub.txt)', 'abc'), text
for i in range(len(text)):
......@@ -300,7 +306,7 @@ class TestIncludes:
def perform(**ensemble):
self.create_files(ensemble)
find_func = gen_find_include_func(r'#include\((?P<name>[^)\n]*)\)')
text, mapping = preprocess_includes(None, 'main', find_func)
_, text, mapping = preprocess_includes(None, 'main', find_func)
substrings = {}
for k, v in reversed(list(ensemble.items())):
for name, content in substrings.items():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment