# server.py - an asynchronous tcp-server to compile sources with DHParser # # Copyright 2019 by Eckhart Arnold (arnold@badw.de) # Bavarian Academy of Sciences an Humanities (badw.de) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. """ Module `server` contains an asynchronous tcp-server that receives compilation requests, runs custom compilation functions in a multiprocessing.Pool. This allows to start a DHParser-compilation environment just once and save the startup time of DHParser for each subsequent compilation. In particular, with a just-in-time-compiler like PyPy (https://pypy.org) setting up a compilation-server is highly recommended, because jit-compilers typically sacrifice startup-speed for running-speed. It is up to the compilation function to either return the result of the compilation in serialized form, or just save the compilation results on the file system an merely return an success or failure message. Module `server` does not define any of these message. This is completely up to the clients of module `server`, i.e. the compilation-modules, to decide. The communication, i.e. requests and responses, follows the json-rpc protocol: https://www.jsonrpc.org/specification For JSON see: https://json.org/ """ import asyncio import json from multiprocessing import Process, Value, Queue from typing import Callable, Optional, Union, Dict, List, Sequence, cast from DHParser.toolkit import get_config_value, re __all__ = ('RPC_Table', 'RPC_Type', 'JSON_Type', 'SERVER_ERROR', 'SERVER_OFFLINE', 'SERVER_STARTING', 'SERVER_ONLINE', 'SERVER_TERMINATE', 'Server') RPC_Table = Dict[str, Callable] RPC_Type = Union[RPC_Table, List[Callable], Callable] JSON_Type = Union[Dict, Sequence, str, int, None] RX_IS_JSON = re.compile(r'\s*(?:{|\[|"|\d|true|false|null)') SERVER_ERROR = "COMPILER-SERVER-ERROR" SERVER_OFFLINE = 0 SERVER_STARTING = 1 SERVER_ONLINE = 2 SERVER_TERMINATE = 3 response_test = b'''HTTP/1.1 200 OK Date: Sun, 18 Oct 2009 08:56:53 GMT Server: Apache/2.2.14 (Win32) Last-Modified: Sat, 20 Nov 2004 07:16:26 GMT ETag: "10000000565a5-2c-3e94b66c2e680" Accept-Ranges: bytes Content-Length: 60 Connection: close Content-Type: text/html X-Pad: avoid browser bug


''' class Server: def __init__(self, rpc_functions: RPC_Type): if isinstance(rpc_functions, Dict): self.rpc_table = cast(RPC_Table, rpc_functions) # type: RPC_Table elif isinstance(rpc_functions, List): self.rpc_table = {} for func in cast(List, rpc_functions): self.rpc_table[func.__name__] = func else: assert isinstance(rpc_functions, Callable) func = cast(Callable, rpc_functions) self.rpc_table = {func.__name__: func} self.max_source_size = get_config_value('max_rpc_size') #type: int self.stage = Value('b', SERVER_OFFLINE) # type: Value self.server = None # type: Optional[asyncio.AbstractServer] self.server_messages = Queue() # type: Queue self.server_process = None # type: Optional[Process] async def handle_compilation_request(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): rpc_error = None # type: Optional[Tuple[int, str]] json_id = 'null' # type: Tuple[int, str] obj = {} # type: Dict result = None # type: JSON_Type raw = None # type: JSON_Type data = await reader.read(self.max_source_size + 1) if len(data) > self.max_source_size: rpc_error = -32600, "Invaild Request: Source code too large! Only %i MB allowed" \ % (self.max_source_size // (1024**2)) if rpc_error is None: try: raw = json.loads(data) except json.decoder.JSONDecodeError as e: rpc_error = -32700, "JSONDecodeError: " + str(e) if rpc_error is None: if isinstance(raw, Dict): obj = cast(Dict, raw) json_id = obj.get('id', 'null') else: rpc_error = -32700, 'Parse error: Request does not appear to be an RPC-call!?' if rpc_error is None: if obj.get('jsonrpc', 'unknown') != '2.0': rpc_error = -32600, 'Invalid Request: jsonrpc version 2.0 needed, version "' \ ' "%s" found.' % obj.get('jsonrpc', b'unknown') elif 'method' not in obj: rpc_error = -32600, 'Invalid Request: No method specified.' elif obj['method'] not in self.rpc_table: rpc_error = -32601, 'Method not found: ' + str(obj['method']) else: method = self.rpc_table[obj['method']] params = obj['params'] if 'params' in obj else () try: if isinstance(params, Sequence): result = method(*params) elif isinstance(params, Dict): result = method(**params) except Exception as e: rpc_error = -32602, "Invalid Params: " + str(e) if rpc_error is None: json_result = {"jsonrpc": "2.0", "result": result, "id": json_id} json.dump(writer, json_result) else: writer.write(('{"jsonrpc": "2.0", "error": {"code": %i, "message": "%s"}, "id": %s}' % (rpc_error[0], rpc_error[1], json_id)).encode()) await writer.drain() writer.close() # TODO: add these lines in case a terminate signal is received, i.e. exit server coroutine # gracefully. # self.server.cancel() async def serve(self, address: str = '', port: int = 8888): self.server = cast(asyncio.base_events.Server, await asyncio.start_server(self.handle_compilation_request, address, port)) async with self.server: self.stage.value = SERVER_ONLINE self.server_messages.put(SERVER_ONLINE) await self.server.serve_forever() # self.server.wait_until_closed() def run_server(self, address: str = '', port: int = 8888): self.stage.value = SERVER_STARTING # loop = asyncio.get_event_loop() # try: # loop.run_until_complete(self.serve(address, port)) # finally: # print(type(self.server)) # # self.server.cancel() # loop.close() asyncio.run(self.serve(address, port)) def wait_until_server_online(self): if self.stage.value != SERVER_ONLINE: if self.server_messages.get() != SERVER_ONLINE: raise AssertionError('could not start server!?') assert self.stage.value == SERVER_ONLINE def run_as_process(self): self.server_process = Process(target=self.run_server) self.server_process.start() self.wait_until_server_online() def terminate_server_process(self): self.server_process.terminate() def wait_for_termination_request(self): assert self.server_process # self.wait_until_server_online() while self.server_messages.get() != SERVER_TERMINATE: pass self.terminate_server_process() self.server_process = None