toolkit.py 13.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# toolkit.py - utility functions for DHParser
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
17 18


19 20 21 22 23 24
"""
Module ``toolkit`` contains utility functions that are needed across
several of the the other DHParser-Modules or that are just very generic
so that they are best defined in a toolkit-module.
"""

di68kap's avatar
di68kap committed
25
import codecs
26
import hashlib
di68kap's avatar
di68kap committed
27
import io
28
import multiprocessing
29
import parser
30
import threading
31

di68kap's avatar
di68kap committed
32 33 34 35
try:
    import regex as re
except ImportError:
    import re
36
import sys
37

38
try:
39
    import typing
40
except ImportError:
41
    import DHParser.foreign_typing as typing
di68kap's avatar
di68kap committed
42
    sys.modules['typing'] = typing  # make it possible to import from typing
43

di68kap's avatar
di68kap committed
44
from typing import Any, Iterable, Sequence, Set, Union, Dict, Hashable, cast
45

eckhart's avatar
eckhart committed
46 47 48
try:
    import cython
    cython_optimized = cython.compiled  # type: bool
eckhart's avatar
eckhart committed
49 50
    if cython_optimized:
        import DHParser.shadow_cython as cython
eckhart's avatar
eckhart committed
51 52 53 54
except ImportError:
    cython_optimized = False            # type: bool
    import DHParser.shadow_cython as cython

55

eckhart's avatar
eckhart committed
56 57 58 59
__all__ = ('typing',
           'cython',
           'cython_optimized',
           'escape_re',
60
           'escape_control_characters',
Eckhart Arnold's avatar
Eckhart Arnold committed
61
           'is_filename',
62
           'concurrent_ident',
63
           'unrepr',
64
           'lstrip_docstring',
65 66
           'issubtype',
           'isgenerictype',
67 68
           'load_if_file',
           'is_python_code',
69 70
           'md5',
           'expand_table',
71
           'compile_python_object',
72
           'smart_list',
73
           'sane_parser_name',
74 75 76 77
           'GLOBALS',
           'CONFIG_PRESET',
           'get_config_value',
           'set_config_value')
78 79 80 81


#######################################################################
#
82
# Thread local globals and configuration
83 84
#
#######################################################################
85

86
GLOBALS = threading.local()
eckhart's avatar
eckhart committed
87
CONFIG_PRESET = dict()  # type: Dict[Hashable, Any]
88 89


eckhart's avatar
eckhart committed
90
def get_config_value(key: Hashable) -> Any:
91 92 93 94 95 96 97 98
    """
    Retrieves a configuration value thread-safely.
    :param key:  the key (an immutable, usually a string)
    :return:     the value
    """
    try:
        cfg = GLOBALS.config
    except AttributeError:
eckhart's avatar
eckhart committed
99
        GLOBALS.config = dict()
100 101 102 103 104 105 106 107 108
        cfg = GLOBALS.config
    try:
        return cfg[key]
    except KeyError:
        value = CONFIG_PRESET[key]
        GLOBALS.config[key] = value
        return value


eckhart's avatar
eckhart committed
109
def set_config_value(key: Hashable, value: Any):
110 111 112 113 114 115 116 117 118 119 120
    """
    Changes a configuration value thread-safely. The configuration
    value will be set only for the current thread. In order to
    set configuration values for any new thread, add the key and value
    to CONFIG_PRESET, before the thread is started.
    :param key:    the key (an immutable, usually a string)
    :param value:  the value
    """
    try:
        _ = GLOBALS.config
    except AttributeError:
eckhart's avatar
eckhart committed
121
        GLOBALS.config = dict()
122 123 124 125 126 127 128 129
    GLOBALS.config[key] = value


#######################################################################
#
# miscellaneous (generic)
#
#######################################################################
130 131


132
def escape_re(strg: str) -> str:
133 134
    """
    Returns the string with all regular expression special characters escaped.
135
    """
136

137
    # assert isinstance(strg, str)
138 139
    re_chars = r"\.^$*+?{}[]()#<>=|!"
    for esc_ch in re_chars:
140 141
        strg = strg.replace(esc_ch, '\\' + esc_ch)
    return strg
142 143


144
def escape_control_characters(strg: str) -> str:
145 146 147
    """
    Replace all control characters (e.g. \n \t) in a string by their backslashed representation.
    """
148

149 150 151 152 153 154 155
    return repr(strg).replace('\\\\', '\\')[1:-1]


def lstrip_docstring(docstring: str) -> str:
    """
    Strips leading whitespace from a docstring.
    """
156

157 158 159 160 161 162 163 164 165 166 167
    lines = docstring.replace('\t', '    ').split('\n')
    indent = 255  # highest integer value
    for line in lines[1:]:
        stripped = line.lstrip()
        if stripped:  # ignore empty lines
            indent = min(indent, len(line) - len(stripped))
    if indent >= 255:
        indent = 0
    return '\n'.join([lines[0]] + [line[indent:] for line in lines[1:]])


168
def is_filename(strg: str) -> bool:
169 170 171 172
    """
    Tries to guess whether string ``strg`` is a file name.
    """

173
    return strg.find('\n') < 0 and strg[:1] != " " and strg[-1:] != " " \
174
        and all(strg.find(ch) < 0 for ch in '*?"<>|')
175
    #   and strg.select('*') < 0 and strg.select('?') < 0
Eckhart Arnold's avatar
Eckhart Arnold committed
176 177


178 179 180 181 182 183 184
def concurrent_ident() -> str:
    """
    Returns an identificator for the current process and thread
    """
    return multiprocessing.current_process().name + '_' + str(threading.get_ident())


185 186 187 188 189 190 191 192 193 194 195 196 197
class unrepr:
    """
    unrepr encapsulates a string representing a python function in such
    a way that the representation of the string yields the function call
    itself rather then a string representing the function call and delimited
    by quotation marks.

    Example:
        >>> "re.compile(r'abc+')"
        "re.compile(r'abc+')"
        >>> unrepr("re.compile(r'abc+')")
        re.compile(r'abc+')
    """
198 199 200 201 202 203 204 205 206 207 208
    def __init__(self, s: str):
        self.s = s  # type: str

    def __eq__(self, other: Union['unrepr', str]):
        if isinstance(other, unrepr):
            return self.s == other.s
        elif isinstance(other, str):
            return self.s == other
        else:
            raise TypeError('unrepr objects can only be compared with '
                            'other unrepr objects or strings!')
209 210 211 212 213 214 215 216

    def __str__(self):
        return self.s

    def __repr__(self):
        return self.s


217 218 219 220 221 222 223 224
#######################################################################
#
# type system support
#
#######################################################################


def issubtype(sub_type, base_type):
225 226 227 228 229 230 231
    def origin(t):
        try:
            ot = t.__origin__
        except AttributeError:
            return t
        return ot if ot is not None else t
    return issubclass(origin(sub_type), origin(base_type))
232 233 234 235 236 237


def isgenerictype(t):
    return str(t).endswith(']')


238 239 240 241 242 243 244
#######################################################################
#
# loading and compiling
#
#######################################################################


245
def load_if_file(text_or_file) -> str:
246 247
    """
    Reads and returns content of a text-file if parameter
248
    `text_or_file` is a file name (i.e. a single line string),
249
    otherwise (i.e. if `text_or_file` is a multi-line string)
250
    `text_or_file` is returned.
251
    """
252

Eckhart Arnold's avatar
Eckhart Arnold committed
253
    if is_filename(text_or_file):
254 255 256 257
        try:
            with open(text_or_file, encoding="utf-8") as f:
                content = f.read()
            return content
258
        except FileNotFoundError:
259
            if re.fullmatch(r'[\w/:. \\]+', text_or_file):
260 261
                raise FileNotFoundError('Not a valid file: ' + text_or_file + '!\n(Add "\\n" '
                                        'to distinguish source data from a file name.)')
262 263
            else:
                return text_or_file
264 265 266 267
    else:
        return text_or_file


268
def is_python_code(text_or_file: str) -> bool:
269 270
    """
    Checks whether 'text_or_file' is python code or the name of a file that
271 272
    contains python code.
    """
273

Eckhart Arnold's avatar
Eckhart Arnold committed
274
    if is_filename(text_or_file):
275 276
        return text_or_file[-3:].lower() == '.py'
    try:
277 278
        parser.suite(text_or_file)
        # compile(text_or_file, '<string>', 'exec')
279 280 281 282 283 284
        return True
    except (SyntaxError, ValueError, OverflowError):
        pass
    return False


285
def has_fenced_code(text_or_file: str, info_strings=('ebnf', 'test')) -> bool:
286 287
    """
    Checks whether `text_or_file` contains fenced code blocks, which are
288 289 290 291
    marked by one of the given info strings.
    See http://spec.commonmark.org/0.28/#fenced-code-blocks for more
    information on fenced code blocks in common mark documents.
    """
292

293 294 295 296 297 298 299 300 301
    if is_filename(text_or_file):
        with open(text_or_file, 'r', encoding='utf-8') as f:
            markdown = f.read()
    else:
        markdown = text_or_file

    if markdown.find('\n~~~') < 0 and markdown.find('\n```') < 0:
        return False

302 303
    if isinstance(info_strings, str):
        info_strings = (info_strings,)
eckhart's avatar
eckhart committed
304 305
    fence_tmpl = r'\n(?:(?:``[`]*[ ]*(?:%s)(?=[ .\-:\n])[^`\n]*\n)' + \
                 r'|(?:~~[~]*[ ]*(?:%s)(?=[ .\-:\n])[\n]*\n))'
306 307
    label_re = '|'.join('(?:%s)' % matched_string for matched_string in info_strings)
    rx_fence = re.compile(fence_tmpl % (label_re, label_re), flags=re.IGNORECASE)
308

309
    for match in rx_fence.finditer(markdown):
eckhart's avatar
eckhart committed
310
        matched_string = re.match(r'(?:\n`+)|(?:\n~+)', match.group(0)).group(0)
311
        if markdown.find(matched_string, match.end()) >= 0:
312 313
            return True
        else:
314 315
            break
    return False
316 317


318
def md5(*txt):
319 320
    """
    Returns the md5-checksum for `txt`. This can be used to test if
321 322
    some piece of text, for example a grammar source file, has changed.
    """
323

324 325 326 327 328 329
    md5_hash = hashlib.md5()
    for t in txt:
        md5_hash.update(t.encode('utf8'))
    return md5_hash.hexdigest()


330
def compile_python_object(python_src, catch_obj_regex=""):
331 332
    """
    Compiles the python source code and returns the (first) object
333 334 335
    the name of which is matched by ``catch_obj_regex``. If catch_obj
    is the empty string, the namespace dictionary will be returned.
    """
336

337 338 339 340 341 342
    if isinstance(catch_obj_regex, str):
        catch_obj_regex = re.compile(catch_obj_regex)
    code = compile(python_src, '<string>', 'exec')
    namespace = {}
    exec(code, namespace)  # safety risk?
    if catch_obj_regex:
343 344
        matches = [key for key in namespace if catch_obj_regex.match(key)]
        if len(matches) < 1:
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
            raise ValueError("No object matching /%s/ defined in source code." %
                             catch_obj_regex.pattern)
        elif len(matches) > 1:
            raise ValueError("Ambiguous matches for %s : %s" %
                             (str(catch_obj_regex), str(matches)))
        return namespace[matches[0]] if matches else None
    else:
        return namespace


#######################################################################
#
# smart lists and multi-keyword tables
#
#######################################################################


Eckhart Arnold's avatar
Eckhart Arnold committed
362
# def smart_list(arg: Union[str, Iterable[T]]) -> Union[Sequence[str], Sequence[T]]:
363
def smart_list(arg: Union[str, Iterable, Any]) -> Union[Sequence, Set]:
364 365
    """
    Returns the argument as list, depending on its type and content.
366

367 368 369
    If the argument is a string, it will be interpreted as a list of
    comma separated values, trying ';', ',', ' ' as possible delimiters
    in this order, e.g.
370 371 372 373 374 375 376
    >>> smart_list('1; 2, 3; 4')
    ['1', '2, 3', '4']
    >>> smart_list('2, 3')
    ['2', '3']
    >>> smart_list('a b cd')
    ['a', 'b', 'cd']

377 378
    If the argument is a collection other than a string, it will be
    returned as is, e.g.
379 380 381 382 383
    >>> smart_list((1, 2, 3))
    (1, 2, 3)
    >>> smart_list({1, 2, 3})
    {1, 2, 3}

384 385
    If the argument is another iterable than a collection, it will
    be converted into a list, e.g.
386 387 388
    >>> smart_list(i for i in {1,2,3})
    [1, 2, 3]

389
    Finally, if none of the above is true, the argument will be
390
    wrapped in a list and returned, e.g.
391 392
    >>> smart_list(125)
    [125]
393
    """
394

395 396 397 398
    if isinstance(arg, str):
        for delimiter in (';', ','):
            lst = arg.split(delimiter)
            if len(lst) > 1:
399 400
                return [s.strip() for s in lst]
        return [s.strip() for s in arg.strip().split(' ')]
401
    elif isinstance(arg, Sequence) or isinstance(arg, Set):
402
        return arg
Eckhart Arnold's avatar
Eckhart Arnold committed
403
    elif isinstance(arg, Iterable):
404
        return list(arg)
405 406 407 408
    else:
        return [arg]


409
def expand_table(compact_table: Dict) -> Dict:
410 411
    """
    Expands a table by separating keywords that are tuples or strings
412 413 414
    containing comma separated words into single keyword entries with
    the same values. Returns the expanded table.
    Example:
415 416
    >>> expand_table({"a, b": 1, ('d','e','f'):5, "c":3})
    {'a': 1, 'b': 1, 'd': 5, 'e': 5, 'f': 5, 'c': 3}
417
    """
418

419
    expanded_table = {}  # type: Dict
420 421 422
    keys = list(compact_table.keys())
    for key in keys:
        value = compact_table[key]
423
        for k in smart_list(key):
424
            if k in expanded_table:
425
                raise KeyError('Key "%s" used more than once in compact table!' % key)
426
            expanded_table[k] = value
427
    return expanded_table
428 429


430 431
#######################################################################
#
432
# miscellaneous (DHParser-specific)
433 434 435 436
#
#######################################################################


437
def sane_parser_name(name) -> bool:
438 439
    """
    Checks whether given name is an acceptable parser name. Parser names
440
    must not be preceded or succeeded by a double underscore '__'!
441
    """
442

di68kap's avatar
di68kap committed
443 444 445
    return name and name[:2] != '__' and name[-2:] != '__'


446 447 448 449 450 451 452
#######################################################################
#
# initialization
#
#######################################################################


di68kap's avatar
di68kap committed
453 454 455 456 457 458 459 460 461 462
try:
    if sys.stdout.encoding.upper() != "UTF-8":  # and  platform.system() == "Windows":
        # make sure that `print()` does not raise an error on
        # non-ASCII characters:
        # sys.stdout = cast(io.TextIOWrapper, codecs.getwriter("utf-8")(cast(
        #     io.BytesIO, cast(io.TextIOWrapper, sys.stdout).detach())))
        sys.stdout = io.TextIOWrapper(sys.stdout.detach(), sys.stdout.encoding, 'replace')
except AttributeError:
    # somebody has already taken care of this !?
    pass