server.py 11.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# server.py - an asynchronous tcp-server to compile sources with DHParser
#
# Copyright 2019  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.


"""
Module `server` contains an asynchronous tcp-server that receives compilation
requests, runs custom compilation functions in a multiprocessing.Pool.

This allows to start a DHParser-compilation environment just once and save the
startup time of DHParser for each subsequent compilation. In particular, with
a just-in-time-compiler like PyPy (https://pypy.org) setting up a
compilation-server is highly recommended, because jit-compilers typically
sacrifice startup-speed for running-speed.

It is up to the compilation function to either return the result of the
compilation in serialized form, or just save the compilation results on the
file system an merely return an success or failure message. Module `server`
does not define any of these message. This is completely up to the clients
of module `server`, i.e. the compilation-modules, to decide.
34

eckhart's avatar
eckhart committed
35
36
37
38
39
40
41
The communication, i.e. requests and responses, follows the json-rpc protocol:

    https://www.jsonrpc.org/specification

For JSON see:

    https://json.org/
42
43
44
45
"""


import asyncio
46
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
47
import json
eckhart's avatar
eckhart committed
48
from multiprocessing import Process, Value, Queue
eckhart's avatar
eckhart committed
49
import sys
eckhart's avatar
eckhart committed
50
51
from typing import Callable, Optional, Union, Dict, List, Tuple, Sequence, Set, cast
from urllib.parse import urlparse, parse_qs
52

eckhart's avatar
eckhart committed
53
from DHParser.toolkit import get_config_value, is_filename, load_if_file, re
54

eckhart's avatar
eckhart committed
55
56
57
58
59
60
61
62
63
64
65
__all__ = ('RPC_Table',
           'RPC_Type',
           'JSON_Type',
           'SERVER_ERROR',
           'SERVER_OFFLINE',
           'SERVER_STARTING',
           'SERVER_ONLINE',
           'SERVER_TERMINATE',
           'Server')


66
67
RPC_Table = Dict[str, Callable]
RPC_Type = Union[RPC_Table, List[Callable], Callable]
eckhart's avatar
eckhart committed
68
JSON_Type = Union[Dict, Sequence, str, int, None]
69

eckhart's avatar
eckhart committed
70
RX_IS_JSON = re.compile(r'\s*(?:{|\[|"|\d|true|false|null)')
71
RX_GREP_URL = re.compile(r'GET ([^ ]+) HTTP')
eckhart's avatar
eckhart committed
72

73
74
SERVER_ERROR = "COMPILER-SERVER-ERROR"

eckhart's avatar
eckhart committed
75
76
77
78
SERVER_OFFLINE = 0
SERVER_STARTING = 1
SERVER_ONLINE = 2
SERVER_TERMINATE = 3
eckhart's avatar
eckhart committed
79

eckhart's avatar
eckhart committed
80
test_response_test = b'''HTTP/1.1 200 OK
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
Date: Sun, 18 Oct 2009 08:56:53 GMT
Server: Apache/2.2.14 (Win32)
Last-Modified: Sat, 20 Nov 2004 07:16:26 GMT
ETag: "10000000565a5-2c-3e94b66c2e680"
Accept-Ranges: bytes
Content-Length: 60
Connection: close
Content-Type: text/html
X-Pad: avoid browser bug

<html>
<head>
</head>
<body>
<h1>Test</h1>
</body>
</html>
'''

eckhart's avatar
eckhart committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
test_get = b'''GET /method/object HTTP/1.1
Host: 127.0.0.1:8888
User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: de,en-US;q=0.7,en;q=0.3
Accept-Encoding: gzip, deflate
DNT: 1
Connection: keep-alive
Upgrade-Insecure-Requests: 1


'''


# CompilationItem = NamedTuple('CompilationItem',
#                              [('uri', str),
#                               ('hash', int),
#                               ('errors', str),
#                               ('preview', str)])

120
121
ALL_RPCs = frozenset('*')  # Magic value denoting all remove procedures

122

123
class Server:
124
125
126
    def __init__(self, rpc_functions: RPC_Type,
                 cpu_bound: Set[str] = ALL_RPCs,
                 blocking: Set[str] = frozenset()):
127
128
129
130
131
132
133
134
135
        if isinstance(rpc_functions, Dict):
            self.rpc_table = cast(RPC_Table, rpc_functions)  # type: RPC_Table
        elif isinstance(rpc_functions, List):
            self.rpc_table = {}
            for func in cast(List, rpc_functions):
                self.rpc_table[func.__name__] = func
        else:
            assert isinstance(rpc_functions, Callable)
            func = cast(Callable, rpc_functions)
eckhart's avatar
eckhart committed
136
            self.rpc_table = {func.__name__: func}
eckhart's avatar
eckhart committed
137

138
139
140
141
142
143
144
145
        # see: https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools
        self.cpu_bound = frozenset(self.rpc_table.keys()) if cpu_bound == ALL_RPCs else cpu_bound
        self.blocking = frozenset(self.rpc_table.keys()) if blocking == ALL_RPCs else blocking
        self.blocking = self.blocking - self.cpu_bound  # cpu_bound property takes precedence

        assert not (self.cpu_bound - self.rpc_table.keys())
        assert not (self.blocking - self.rpc_table.keys())

eckhart's avatar
eckhart committed
146
147
148
        self.max_source_size = get_config_value('max_rpc_size')  #type: int
        self.stage = Value('b', SERVER_OFFLINE)  # type: Value
        self.server = None  # type: Optional[asyncio.AbstractServer]
eckhart's avatar
eckhart committed
149
150
        self.server_messages = Queue()  # type: Queue
        self.server_process = None  # type: Optional[Process]
151

152
153
154
        # if the server is run in a separate process, the following variables
        # should only be accessed from the server process
        self.server = None  # type: Optional[asyncio.AbstractServer]
155
156
        self.pp_executor = None  # type: Optional[ProcessPoolExecutor]
        self.tp_executor = None  # type: Optional[ThreadPoolExecutor]
eckhart's avatar
eckhart committed
157
158
159
160

    async def handle_request(self,
                             reader: asyncio.StreamReader,
                             writer: asyncio.StreamWriter):
eckhart's avatar
eckhart committed
161
162
163
164
165
        rpc_error = None  # type: Optional[Tuple[int, str]]
        json_id = 'null'  # type: Tuple[int, str]
        obj = {}          # type: Dict
        result = None     # type: JSON_Type
        raw = None        # type: JSON_Type
166

eckhart's avatar
eckhart committed
167
        data = await reader.read(self.max_source_size + 1)
Eckhart Arnold's avatar
Eckhart Arnold committed
168

169
        if len(data) > self.max_source_size:
Eckhart Arnold's avatar
Eckhart Arnold committed
170
171
172
            rpc_error = -32600, "Invaild Request: Source code too large! Only %i MB allowed" \
                                % (self.max_source_size // (1024**2))

173
174
175
176
177
178
179
180
        if data.startswith(b'GET'):
            m = RX_GREP_URL(str(data[:4096]))
            if m:
                parsed_url = urlparse(m.group(1))
                # TODO: serve http request
                pass
        else:
            head = str(data[:4096])
eckhart's avatar
eckhart committed
181
182
            if not RX_IS_JSON.match(head):
                # TODO: compile file or data
183
184
                pass

Eckhart Arnold's avatar
Eckhart Arnold committed
185
186
        if rpc_error is None:
            try:
eckhart's avatar
eckhart committed
187
                raw = json.loads(data)
Eckhart Arnold's avatar
Eckhart Arnold committed
188
            except json.decoder.JSONDecodeError as e:
eckhart's avatar
eckhart committed
189
                rpc_error = -32700, "JSONDecodeError: " + str(e) + str(data)
Eckhart Arnold's avatar
Eckhart Arnold committed
190
191

        if rpc_error is None:
eckhart's avatar
eckhart committed
192
193
194
            if isinstance(raw, Dict):
                obj = cast(Dict, raw)
                json_id = obj.get('id', 'null')
Eckhart Arnold's avatar
Eckhart Arnold committed
195
            else:
196
                rpc_error = -32700, 'Parse error: Request does not appear to be an RPC-call!?'
Eckhart Arnold's avatar
Eckhart Arnold committed
197
198
199
200

        if rpc_error is None:
            if obj.get('jsonrpc', 'unknown') != '2.0':
                rpc_error = -32600, 'Invalid Request: jsonrpc version 2.0 needed, version "' \
eckhart's avatar
eckhart committed
201
202
                                    ' "%s" found.' % obj.get('jsonrpc', b'unknown')
            elif 'method' not in obj:
203
204
205
206
                rpc_error = -32600, 'Invalid Request: No method specified.'
            elif obj['method'] not in self.rpc_table:
                rpc_error = -32601, 'Method not found: ' + str(obj['method'])
            else:
207
208
                method_name = obj['method']
                method = self.rpc_table[method_name]
209
210
                params = obj['params'] if 'params' in obj else ()
                try:
211
212
213
214
215
216
217
218
                    # run method either a) directly if it is short running or
                    # b) in a thread pool if it contains blocking io or
                    # c) in a process pool if it is cpu bound
                    # see: https://docs.python.org/3/library/asyncio-eventloop.html
                    #      #executing-code-in-thread-or-process-pools
                    has_kw_params = isinstance(params, Dict)
                    assert has_kw_params or isinstance(params, Sequence)
                    loop = asyncio.get_running_loop()
219
220
221
                    executor = self.pp_executor if method_name in self.cpu_bound else \
                               self.tp_executor if method_name in self.blocking else None
                    if executor is None:
222
223
                        result = method(**params) if has_kw_params else method(*params)
                    elif has_kw_params:
224
                        result = await loop.run_in_executor(executor, method, **params)
225
                    else:
226
                        result = await loop.run_in_executor(executor, method, *params)
227
                except TypeError as e:
228
                    rpc_error = -32602, "Invalid Params: " + str(e)
229
230
231
232
                except NameError as e:
                    rpc_error = -32601, "Method not found: " + str(e)
                except Exception as e:
                    rpc_error = -32000, "Server Error: " + str(e)
233

Eckhart Arnold's avatar
Eckhart Arnold committed
234
235
236
237
        if rpc_error is None:
            json_result = {"jsonrpc": "2.0", "result": result, "id": json_id}
            json.dump(writer, json_result)
        else:
238
            writer.write(('{"jsonrpc": "2.0", "error": {"code": %i, "message": "%s"}, "id": %s}'
Eckhart Arnold's avatar
Eckhart Arnold committed
239
                          % (rpc_error[0], rpc_error[1], json_id)).encode())
240
241
        await writer.drain()
        writer.close()
242
        # TODO: add these lines in case a terminate signal is received, i.e. exit server coroutine
eckhart's avatar
eckhart committed
243
        #  gracefully. Is this needed? Does it work?
244
        # self.server.cancel()
245

eckhart's avatar
eckhart committed
246
    async def serve(self, address: str = '127.0.0.1', port: int = 8888):
247
        with ProcessPoolExecutor() as p, ThreadPoolExecutor() as t:
248
249
250
251
252
253
            self.server = cast(asyncio.base_events.Server,
                               await asyncio.start_server(self.handle_request, address, port))
            async with self.server:
                self.stage.value = SERVER_ONLINE
                self.server_messages.put(SERVER_ONLINE)
                await self.server.serve_forever()
eckhart's avatar
eckhart committed
254
            # self.server.wait_until_closed()
255

eckhart's avatar
eckhart committed
256
    def run_server(self, address: str = '127.0.0.1', port: int = 8888):
eckhart's avatar
eckhart committed
257
        self.stage.value = SERVER_STARTING
eckhart's avatar
eckhart committed
258
259
260
261
262
263
264
265
266
267
        if sys.version_info >= (3, 7):
            asyncio.run(self.serve(address, port))
        else:
            loop = asyncio.get_event_loop()
            try:
                loop.run_until_complete(self.serve(address, port))
            finally:
                # self.server.cancel()
                loop.close()

eckhart's avatar
eckhart committed
268
    def wait_until_server_online(self):
eckhart's avatar
eckhart committed
269
270
271
272
273
274
275
276
277
278
279
        if self.stage.value != SERVER_ONLINE:
            if self.server_messages.get() != SERVER_ONLINE:
                raise AssertionError('could not start server!?')
            assert self.stage.value == SERVER_ONLINE

    def run_as_process(self):
        self.server_process = Process(target=self.run_server)
        self.server_process.start()
        self.wait_until_server_online()

    def terminate_server_process(self):
eckhart's avatar
eckhart committed
280
281
282
283
284
        if self.server_process:
            self.stage.value = SERVER_TERMINATE
            self.server_process.terminate()
            self.server_process = None
        self.stage.value = SERVER_OFFLINE
eckhart's avatar
eckhart committed
285

286
    def wait_for_termination_request(self):
eckhart's avatar
eckhart committed
287
288
289
290
291
292
        if self.server_process:
            if self.stage.value in (SERVER_STARTING, SERVER_ONLINE):
                while self.server_messages.get() != SERVER_TERMINATE:
                    pass
            if self.stage.value == SERVER_TERMINATE:
                self.terminate_server_process()