2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 21e5ad3f authored by Eckhart Arnold's avatar Eckhart Arnold
Browse files

- server.py: added logic for long running and blocking tasks

parent 6438ee95
......@@ -43,10 +43,11 @@ For JSON see:
import asyncio
import concurrent.futures
import json
from multiprocessing import Process, Value, Queue
import sys
from typing import Callable, Optional, Union, Dict, List, Tuple, NamedTuple, Sequence, cast
from typing import Callable, Optional, Union, Dict, List, Tuple, NamedTuple, Sequence, Set, cast
from DHParser.toolkit import get_config_value, is_filename, load_if_file, re
......@@ -114,9 +115,13 @@ Upgrade-Insecure-Requests: 1
# ('errors', str),
# ('preview', str)])
ALL_RPCs = frozenset('*') # Magic value denoting all remove procedures
class Server:
def __init__(self, rpc_functions: RPC_Type):
def __init__(self, rpc_functions: RPC_Type,
cpu_bound: Set[str] = ALL_RPCs,
blocking: Set[str] = frozenset()):
if isinstance(rpc_functions, Dict):
self.rpc_table = cast(RPC_Table, rpc_functions) # type: RPC_Table
elif isinstance(rpc_functions, List):
......@@ -128,13 +133,25 @@ class Server:
func = cast(Callable, rpc_functions)
self.rpc_table = {func.__name__: func}
# see: https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools
self.cpu_bound = frozenset(self.rpc_table.keys()) if cpu_bound == ALL_RPCs else cpu_bound
self.blocking = frozenset(self.rpc_table.keys()) if blocking == ALL_RPCs else blocking
self.blocking = self.blocking - self.cpu_bound # cpu_bound property takes precedence
assert not (self.cpu_bound - self.rpc_table.keys())
assert not (self.blocking - self.rpc_table.keys())
self.max_source_size = get_config_value('max_rpc_size') #type: int
self.stage = Value('b', SERVER_OFFLINE) # type: Value
self.server = None # type: Optional[asyncio.AbstractServer]
self.server_messages = Queue() # type: Queue
self.server_process = None # type: Optional[Process]
# self.registry = {} # type: Dict[str, ]
# if the server is run in a separate process, the following variables
# should only be accessed from the server process
self.server = None # type: Optional[asyncio.AbstractServer]
self.pp_executor = None # type: Optional[concurrent.futures.ProcessPoolExecutor]
self.tp_executor = None # type: Optional[concurrent.futures.ThreadPoolExecutor]
async def handle_request(self,
reader: asyncio.StreamReader,
......@@ -173,15 +190,32 @@ class Server:
elif obj['method'] not in self.rpc_table:
rpc_error = -32601, 'Method not found: ' + str(obj['method'])
else:
method = self.rpc_table[obj['method']]
method_name = obj['method']
method = self.rpc_table[method_name]
params = obj['params'] if 'params' in obj else ()
try:
if isinstance(params, Sequence):
result = method(*params)
elif isinstance(params, Dict):
result = method(**params)
except Exception as e:
# run method either a) directly if it is short running or
# b) in a thread pool if it contains blocking io or
# c) in a process pool if it is cpu bound
# see: https://docs.python.org/3/library/asyncio-eventloop.html
# #executing-code-in-thread-or-process-pools
has_kw_params = isinstance(params, Dict)
assert has_kw_params or isinstance(params, Sequence)
loop = asyncio.get_running_loop()
pool = pp_pool if method_name in self.cpu_bound else \
tp_pool if method_name in self.blocking else None
if pool is None:
result = method(**params) if has_kw_params else method(*params)
elif has_kw_params:
result = await loop.run_in_executor(pool, method, **params)
else:
result = await loop.run_in_executor(pool, method, *params)
except TypeError as e:
rpc_error = -32602, "Invalid Params: " + str(e)
except NameError as e:
rpc_error = -32601, "Method not found: " + str(e)
except Exception as e:
rpc_error = -32000, "Server Error: " + str(e)
if rpc_error is None:
json_result = {"jsonrpc": "2.0", "result": result, "id": json_id}
......@@ -196,12 +230,14 @@ class Server:
# self.server.cancel()
async def serve(self, address: str = '127.0.0.1', port: int = 8888):
self.server = cast(asyncio.base_events.Server,
await asyncio.start_server(self.handle_request, address, port))
async with self.server:
self.stage.value = SERVER_ONLINE
self.server_messages.put(SERVER_ONLINE)
await self.server.serve_forever()
with concurrent.futures.ProcessPoolExecutor() as p, \
concurrent.futures.ThreadPoolExecutor() as t:
self.server = cast(asyncio.base_events.Server,
await asyncio.start_server(self.handle_request, address, port))
async with self.server:
self.stage.value = SERVER_ONLINE
self.server_messages.put(SERVER_ONLINE)
await self.server.serve_forever()
# self.server.wait_until_closed()
def run_server(self, address: str = '127.0.0.1', port: int = 8888):
......@@ -216,7 +252,6 @@ class Server:
# self.server.cancel()
loop.close()
def wait_until_server_online(self):
if self.stage.value != SERVER_ONLINE:
if self.server_messages.get() != SERVER_ONLINE:
......
......@@ -39,8 +39,12 @@ def compiler_dummy(src: str, log_dir: str) -> Tuple[str, str]:
class TestServer:
def test_server(self):
cs = Server(compiler_dummy)
# def test_server(self):
# cs = Server(compiler_dummy)
# cs.run_server()
def test_server_proces(self):
cs = Server(compiler_dummy, cpu_bound=set())
cs.run_as_process()
async def compile(src, log_dir):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment