Commit fd8055c8 authored by eckhart's avatar eckhart
Browse files

stringview.py: class TextBuffer added: supports the tracking of incremental changes to a text

parent 52a16461
......@@ -25,3 +25,6 @@ cdef class StringView:
cdef readonly int _len
cdef str _fullstring
# class TextBuffer
......@@ -32,7 +32,7 @@ speedup. The modules comes with a ``stringview.pxd`` that contains some type
declarations to more fully exploit the benefits of the Cython-compiler.
"""
from typing import Optional, Union, Iterable, Tuple, List, Sequence
from typing import Optional, Union, Iterable, Tuple, List, Sequence, cast
try:
import cython
......@@ -319,7 +319,7 @@ class StringView: # collections.abc.Sized
return regex.finditer(self._text, pos=self._begin, endpos=self._end)
@cython.locals(begin=cython.int, end=cython.int)
def strip(self, chars: str = ' \n\t') -> 'StringView':
def strip(self, chars: str = ' \n\r\t') -> 'StringView':
"""Returns a copy of the StringView `self` with leading and trailing
whitespace removed.
"""
......@@ -353,11 +353,16 @@ class StringView: # collections.abc.Sized
length = len(sep)
k = 0
i = self.find(sep, k)
# while i >= 0:
# pieces.append(self._text[self._begin + k: self._begin + i])
# k = i + length
# i = self.find(sep, k)
# pieces.append(self._text[self._begin + k: self._end])
while i >= 0:
pieces.append(self._text[self._begin + k: self._begin + i])
pieces.append(self[k:i])
k = i + length
i = self.find(sep, k)
pieces.append(self._text[self._begin + k: self._end])
pieces.append(self[k:])
return pieces
def replace(self, old, new) -> str:
......@@ -366,3 +371,82 @@ class StringView: # collections.abc.Sized
EMPTY_STRING_VIEW = StringView('')
class TextBuffer:
"""TextBuffer class manages a copy of an edited text for a language
server. The text can be changed via incremental edits. TextBuffer
keeps track of the state of the complete text at any point in time.
It works line oriented and lines of text can be retrieved via
indexing or slicing.
"""
def __init__(self, text: Union[str, StringView], version: int = 0):
self._text = text # type: Union[str, StringView]
self._buffer = [] # type: List[Union[str, StringView]]
self.version = version # type: int
def _lazy_init(self):
self._buffer = [line.strip('\r') for line in self._text.split('\n')]
def __getitem__(self, index: Union[slice, int]) -> List[Union[str, StringView]]:
if not self._buffer:
self._lazy_init()
return self._buffer.__getitem__(index)
def __str__(self) -> str:
if self._text:
return self._text
return self.snapshot('\n')
def update(self, l1: int, c1: int, l2: int, c2: int, replacement: Union[str, StringView]):
"""Replaces the text-range from line and column (l1, c1) to
line and columnt (l2, c2) with the replacement-string.
"""
if not self._buffer:
self._lazy_init()
lines = [line.strip('\r') for line in replacement.split('\n')]
head = self._buffer[l1][:c1]
tail = self._buffer[l2][c2:]
lines[0] = head + lines[0]
lines[-1] += tail
self._buffer[l1:l2 + 1] = lines
self._text = '' # invalidate sinlge-string copy
self.version += 1
def textEdits(self, edits: Union[list, dict], version: int = -1):
"""Incorporates the one or more text-edits or change-events into the text.
A Text-Edit is a dictionary of this form:
{"range": {"start": {"line": 0, "character": 0 },
"end": {"line": 0, "character": 0} },
"newText": "..."}
In case of a change-event, the key "newText" is replaced by "text".
"""
def edit(ed: dict):
"""Weaves a single edit into the text-buffer."""
range = ed["range"]
start = range["start"]
end = range["end"]
try:
replacement = ed['text']
except KeyError:
replacement = ed['newText']
self.update(start["line"], start["character"], end["line"], end["character"], replacement)
if isinstance(edits, list):
for ed in edits:
edit(ed)
else:
edit(cast(dict, edits))
if version >= 0:
self.version = version
def snapshot(self, eol: str = '\n') -> str:
"""Returns the current state of the entire text, using the given
end of line marker ('\n' or '\r\n')"""
if self._text:
return self._text
self._text = eol.join(str(line) for line in self._buffer)
return self._text
......@@ -21,7 +21,6 @@ limitations under the License.
"""
import asyncio
from itertools import chain
import os
import sys
......@@ -199,7 +198,7 @@ class EBNFLanguageServerProtocol:
self.lsp_fulltable.update(self.blocking.lsp_table)
self.pending_changes = dict() # uri -> text
self.current_text = dict() # uri -> text
self.current_text = dict() # uri -> TextBuffer
# self.completionItems = [{k: v for k, v in chain(zip(self.completion_fields, item),
# [['kind', 2]])}
......@@ -225,6 +224,10 @@ class EBNFLanguageServerProtocol:
return {}
def lsp_textDocument_didOpen(self, textDocument):
from DHParser.stringview import TextBuffer
uri = textDocument['uri']
text = textDocument['text']
self.current_text[uri] = TextBuffer(text, int(textDocument.get('version', 0)))
return None
def lsp_textDocument_didSave(self, **kwargs):
......@@ -235,17 +238,27 @@ class EBNFLanguageServerProtocol:
async def lsp_textDocument_didChange(self, textDocument: dict, contentChanges: list):
uri = textDocument['uri']
text = contentChanges[0]['text']
self.current_text[uri] = text
self.pending_changes[uri] = text
await asyncio.sleep(3)
text = self.pending_changes.get(uri, None)
if text:
exenv = self.connection.exec
del self.pending_changes[uri]
result, rpc_error = await exenv.execute(exenv.process_executor,
self.cpu_bound.compile_EBNF,
(text,))
version = int(textDocument['version'])
if uri not in self.current_text:
return {}, (-32602, "Invalid uri: " + uri)
if contentChanges:
from DHParser.stringview import TextBuffer
if 'range' in contentChanges[0]:
text = self.current_text[uri]
text.textEdits(contentChanges, version)
self.pending_changes[uri] = text
else:
text = TextBuffer(contentChanges[0]['text'], version)
self.current_text[uri] = text
await asyncio.sleep(3)
text = self.pending_changes.get(uri, None)
if text:
exenv = self.connection.exec
del self.pending_changes[uri]
result, rpc_error = await exenv.execute(exenv.process_executor,
self.cpu_bound.compile_EBNF,
(str(text),))
return None
def lsp_textDocument_completion(self, textDocument: dict, position: dict, context: dict):
......
......@@ -26,7 +26,7 @@ scriptpath = os.path.dirname(__file__) or '.'
sys.path.append(os.path.abspath(os.path.join(scriptpath, '..')))
from DHParser.toolkit import re
from DHParser.stringview import StringView, EMPTY_STRING_VIEW, real_indices
from DHParser.stringview import StringView, TextBuffer, EMPTY_STRING_VIEW, real_indices
class TestStringView:
......@@ -150,6 +150,26 @@ class TestStringView:
assert s.split(',') == ['1', '2', '3', '4', '5']
class TestTextBuffer:
def setup(self):
self.test_text = "\n".join([
"To be or not to be",
"that is the question.",
"Whether it is nobler in mind to suffer",
"the slings and arrows of misfortune,",
"or, by opposing them, end them."
])
def test_update(self):
t = TextBuffer(self.test_text)
# single line
t.update(1, 8, 1, 11, 'a')
assert str(t).startswith('To be or not to be\nthat is a question.\nWhether ')
# several lines
t.update(1, 10, 2, 7, 'question;\nwhether')
assert str(t).startswith("To be or not to be\nthat is a question;\nwhether ")
if __name__ == "__main__":
from DHParser.testing import runner
runner("", globals())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment