Commit 58817ac8 authored by Eckhart Arnold's avatar Eckhart Arnold
Browse files

- sever.py: termination of servers tested (all three variants)

parent 48bfd942
......@@ -43,12 +43,12 @@ For JSON see:
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, CancelledError
import json
from multiprocessing import Process, Value, Queue
import sys
import time
from typing import Callable, Optional, Union, Dict, List, Tuple, Sequence, Set, cast
from typing import Callable, Coroutine, Optional, Union, Dict, List, Tuple, Sequence, Set, cast
from DHParser.toolkit import get_config_value, is_filename, load_if_file, re
......@@ -67,8 +67,8 @@ RPC_Table = Dict[str, Callable]
RPC_Type = Union[RPC_Table, List[Callable], Callable]
JSON_Type = Union[Dict, Sequence, str, int, None]
RX_IS_JSON = re.compile(b'\s*(?:{|\[|"|\d|true|false|null)')
RX_GREP_URL = re.compile(b'GET ([^ ]+) HTTP')
RE_IS_JSON = b'\s*(?:{|\[|"|\d|true|false|null)'
RE_GREP_URL = b'GET ([^ \n]+) HTTP'
SERVER_ERROR = "COMPILER-SERVER-ERROR"
......@@ -77,7 +77,7 @@ SERVER_STARTING = 1
SERVER_ONLINE = 2
SERVER_TERMINATE = 3
RESPONSE_HEADER = b'''HTTP/1.1 200 OK
RESPONSE_HEADER = '''HTTP/1.1 200 OK
Date: {date}
Server: DHParser
Accept-Ranges: none
......@@ -87,35 +87,57 @@ Content-Type: text/html; charset=utf-8
X-Pad: avoid browser bug
'''
UNKNOWN_FUNC_HTML = """<!DOCTYPE html>
ONELINER_HTML = """<!DOCTYPE html>
<html lang="en" xml:lang="en">
<head>
<meta charset="UTF-8" />
</head>
<body>
<h1>DHParser Error: Function {func} unknown or not registered!</h1>
<h1>{line}</h1>
</body>
</html>
"""
UNKNOWN_FUNC_HTML = ONELINER_HTML.format(
line="DHParser Error: Function {func} unknown or not registered!")
STOP_SERVER_REQUEST = b"__STOP_SERVER__"
__test_get = b'''GET /method/object HTTP/1.1
Host: 127.0.0.1:8888
User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: de,en-US;q=0.7,en;q=0.3
Accept-Encoding: gzip, deflate
DNT: 1
Connection: keep-alive
Upgrade-Insecure-Requests: 1
# __test_get = b'''GET /method/object HTTP/1.1
# Host: 127.0.0.1:8888
# User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0
# Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
# Accept-Language: de,en-US;q=0.7,en;q=0.3
# Accept-Encoding: gzip, deflate
# DNT: 1
# Connection: keep-alive
# Upgrade-Insecure-Requests: 1
#
#
# '''
'''
def json_rpc(func: Callable,
params: Union[List[JSON_Type], Dict[str, JSON_Type]],
ID: Optional[int]=None) -> str:
"""Generates a JSON-RPC-call for `func` with parameters `params`"""
return json.dumps({"jsonrpc": "2.0", "method": func.__name__, "params": params, "id": ID})
def asyncio_run(coroutine: Coroutine):
"""Backward compatible version of Pyhon 3.7's `asyncio.run()`"""
if sys.version_info >= (3, 7):
asyncio.run(coroutine)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coroutine)
finally:
loop.close()
def gmtstamp() -> str:
def GMT_timestamp() -> str:
return time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
# CompilationItem = NamedTuple('CompilationItem',
......@@ -166,44 +188,48 @@ class Server:
async def handle_request(self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
rpc_error = None # type: Optional[Tuple[int, str]]
json_id = 'null' # type: Tuple[int, str]
obj = {} # type: Dict
result = None # type: JSON_Type
raw = None # type: JSON_Type
killswitch = False # type: bool
rpc_error = None # type: Optional[Tuple[int, str]]
json_id = 'null' # type: Tuple[int, str]
obj = {} # type: Dict
result = None # type: JSON_Type
raw = None # type: JSON_Type
kill_switch = False # type: bool
data = await reader.read(self.max_source_size + 1)
oversized = len(data) > self.max_source_size
def http_response(html: str) -> bytes:
gmt = GMT_timestamp()
encoded_html = html.encode()
response = RESPONSE_HEADER.format(date=gmt, length=len(encoded_html))
return response.encode() + encoded_html
if data.startswith(b'GET'):
# HTTP request
m = RX_GREP_URL(data)
m = re.match(RE_GREP_URL, data)
# m = RX_GREP_URL(data.decode())
if m:
func_name, argument = m.group(1).decode().strip('/').split('/', 1) + [None]
if func_name.encode() == STOP_SERVER_REQUEST:
writer.write(self.stop_response.encode())
killswitch = True
writer.write(http_response(ONELINER_HTML.format(line=self.stop_response)))
kill_switch = True
else:
func = self.rpc_table.get(func_name,
lambda _: UNKNOWN_FUNC_HTML.format(func=func_name))
result = func(argument) if argument is not None else func()
if isinstance(result, str):
gmt = gmtstamp.encode()
response = RESPONSE_HEADER.format(date=gmt, length=str(len(res)).encode) \
+ result.encode()
writer.write(response)
writer.write(http_response(result))
else:
writer.write(json.dumps(result).encode())
writer.write(http_response(json.dumps(result, indent=2)))
elif not RX_IS_JSON.match(data):
elif not re.match(RE_IS_JSON, data):
# plain data
if oversized:
writer.write("Source code too large! Only %i MB allowed" \
% (self.max_source_size // (1024 ** 2)))
elif data == STOP_SERVER_REQUEST:
writer.write(self.stop_response.encode())
killswitch = True
kill_switch = True
else:
if len(self.rpc_table) == 1:
func = self.rpc_table[tuple(self.rpc_table.keys())[0]]
......@@ -243,7 +269,7 @@ class Server:
rpc_error = -32600, 'Invalid Request: No method specified.'
elif obj['method'] == STOP_SERVER_REQUEST.decode():
result = self.stop_response
killswitch = True
kill_switch = True
elif obj['method'] not in self.rpc_table:
rpc_error = -32601, 'Method not found: ' + str(obj['method'])
else:
......@@ -276,17 +302,20 @@ class Server:
if rpc_error is None:
json_result = {"jsonrpc": "2.0", "result": result, "id": json_id}
json.dump(writer, json_result)
writer.write(json.dumps(json_result).encode())
else:
writer.write(('{"jsonrpc": "2.0", "error": {"code": %i, "message": "%s"}, "id": %s}'
% (rpc_error[0], rpc_error[1], json_id)).encode())
await writer.drain()
writer.close()
if killswitch:
self.server.cancel()
if kill_switch:
# TODO: terminate processes and threads!
self.stage.value = SERVER_TERMINATE
self.server.close()
async def serve(self, host: str = '127.0.0.1', port: int = 8888):
with ProcessPoolExecutor() as p, ThreadPoolExecutor() as t:
self.pp_executor = p
self.tp_executor = t
self.stop_response = "DHParser server at {}:{} stopped!".format(host, port)
self.server = cast(asyncio.base_events.Server,
await asyncio.start_server(self.handle_request, host, port))
......@@ -294,43 +323,71 @@ class Server:
self.stage.value = SERVER_ONLINE
self.server_messages.put(SERVER_ONLINE)
await self.server.serve_forever()
# self.server.wait_until_closed()
def run_server(self, address: str = '127.0.0.1', port: int = 8888):
def _empty_message_queue(self):
while not self.server_messages.empty():
self.server_messages.get()
def run_server(self, host: str = '127.0.0.1', port: int = 8888):
"""
Starts a DHParser-Server. This function will not return until the
DHParser-Server ist stopped by sending a STOP_SERVER_REQUEST.
"""
assert self.stage.value == SERVER_OFFLINE
self.stage.value = SERVER_STARTING
if sys.version_info >= (3, 7):
asyncio.run(self.serve(address, port))
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self.serve(address, port))
finally:
# self.server.cancel()
loop.close()
self._empty_message_queue()
try:
asyncio_run(self.serve(host, port))
except CancelledError:
self.pp_executor = None
self.tt_exectuor = None
self.server_messages.put(SERVER_OFFLINE)
self.stage.value = SERVER_OFFLINE
def wait_until_server_online(self):
if self.stage.value != SERVER_ONLINE:
if self.server_messages.get() != SERVER_ONLINE:
message = self.server_messages.get()
if message != SERVER_ONLINE:
raise AssertionError('could not start server!?')
assert self.stage.value == SERVER_ONLINE
def run_as_process(self):
self.server_process = Process(target=self.run_server)
def spawn_server(self, host: str = '127.0.0.1', port: int = 8888):
"""
Start DHParser-Server in a separate process and return.
"""
if self.server_process:
assert not self.server_process.is_alive()
self.server_process.close()
self.server_process = None
self._empty_message_queue()
self.server_process = Process(
target=self.run_server, args=(host, port), name="DHParser-Server")
self.server_process.start()
self.wait_until_server_online()
def terminate_server_process(self):
if self.server_process:
self.stage.value = SERVER_TERMINATE
self.server_process.terminate()
self.server_process = None
self.stage.value = SERVER_OFFLINE
def wait_for_termination_request(self):
if self.server_process:
if self.stage.value in (SERVER_STARTING, SERVER_ONLINE):
while self.server_messages.get() != SERVER_TERMINATE:
pass
if self.stage.value == SERVER_TERMINATE:
self.terminate_server_process()
"""
Terminates the server process.
"""
try:
if self.server_process and self.server_process.is_alive():
if self.stage.value in (SERVER_STARTING, SERVER_ONLINE):
self.stage.value = SERVER_TERMINATE
self.server_process.terminate()
self.server_process.join()
self.server_process.close()
self.server_process = None
self.stage.value = SERVER_OFFLINE
except AssertionError as debugger_err:
print('If this message appears out of debugging mode, it is an error!')
def wait_for_termination(self):
"""
Waits for the termination of the server-process. Termination of the
server-process can be triggered by sending a STOP_SERVER_REQUEST to the
server.
"""
if self.stage.value in (SERVER_STARTING, SERVER_ONLINE, SERVER_TERMINATE):
while self.server_messages.get() != SERVER_OFFLINE:
pass
self.terminate_server_process()
......@@ -21,6 +21,7 @@ limitations under the License.
"""
import asyncio
import json
import os
from multiprocessing import Process
import sys
......@@ -28,14 +29,14 @@ from typing import Tuple
sys.path.extend(['../', './'])
from DHParser.server import Server
from DHParser.server import Server, STOP_SERVER_REQUEST, SERVER_OFFLINE
scriptdir = os.path.dirname(os.path.realpath(__file__))
def compiler_dummy(src: str, log_dir: str='') -> Tuple[str, str]:
return src
# def compiler_dummy(src: str, log_dir: str='') -> Tuple[str, str]:
# return src
class TestServer:
......@@ -43,20 +44,61 @@ class TestServer:
# cs = Server(compiler_dummy)
# cs.run_server()
def test_server_proces(self):
cs = Server(compiler_dummy, cpu_bound=set())
cs.run_as_process()
def compiler_dummy(self, src: str, log_dir: str = '') -> Tuple[str, str]:
return src
async def compile(src, log_dir):
def test_server_proces(self):
"""Basic Test of server module."""
async def compile(src):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
writer.write(src.encode())
data = await reader.read(500)
# print(f'Received: {data.decode()!r}')
writer.close()
assert data.decode() == "Test"
try:
cs = Server(self.compiler_dummy, cpu_bound=set())
cs.spawn_server('127.0.0.1', 8888)
asyncio.run(compile('Test'))
finally:
cs.terminate_server_process()
cs.wait_for_termination()
def test_terminate(self):
async def terminate_server(termination_request, expected_response):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
writer.write(termination_request)
data = await reader.read(500)
writer.close()
# print(data)
assert data.find(expected_response) >= 0, str(data)
try:
cs = Server(self.compiler_dummy, cpu_bound=set())
cs.spawn_server('127.0.0.1', 8888)
asyncio.run(terminate_server(STOP_SERVER_REQUEST,
b'DHParser server at 127.0.0.1:8888 stopped!'))
cs.wait_for_termination()
assert cs.stage.value == SERVER_OFFLINE
cs.spawn_server('127.0.0.1', 8888)
asyncio.run(terminate_server(b'GET ' + STOP_SERVER_REQUEST + b' HTTP',
b'DHParser server at 127.0.0.1:8888 stopped!'))
cs.wait_for_termination()
assert cs.stage.value == SERVER_OFFLINE
cs.spawn_server('127.0.0.1', 8888)
jsonrpc = json.dumps({"jsonrpc": "2.0", "method": STOP_SERVER_REQUEST.decode()})
asyncio.run(terminate_server(jsonrpc.encode(),
b'DHParser server at 127.0.0.1:8888 stopped!'))
cs.wait_for_termination()
assert cs.stage.value == SERVER_OFFLINE
finally:
cs.terminate_server_process()
cs.wait_for_termination()
asyncio.run(compile('Test', ''))
cs.terminate_server_process()
cs.wait_for_termination_request()
if __name__ == "__main__":
from DHParser.testing import runner
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment