2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

server.py 7.91 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# server.py - an asynchronous tcp-server to compile sources with DHParser
#
# Copyright 2019  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.


"""
Module `server` contains an asynchronous tcp-server that receives compilation
requests, runs custom compilation functions in a multiprocessing.Pool.

This allows to start a DHParser-compilation environment just once and save the
startup time of DHParser for each subsequent compilation. In particular, with
a just-in-time-compiler like PyPy (https://pypy.org) setting up a
compilation-server is highly recommended, because jit-compilers typically
sacrifice startup-speed for running-speed.

It is up to the compilation function to either return the result of the
compilation in serialized form, or just save the compilation results on the
file system an merely return an success or failure message. Module `server`
does not define any of these message. This is completely up to the clients
of module `server`, i.e. the compilation-modules, to decide.
34

eckhart's avatar
eckhart committed
35
36
37
38
39
40
41
The communication, i.e. requests and responses, follows the json-rpc protocol:

    https://www.jsonrpc.org/specification

For JSON see:

    https://json.org/
42
43
44
45
"""


import asyncio
46
import json
eckhart's avatar
eckhart committed
47
from multiprocessing import Process, Value, Queue
48
from typing import Callable, Optional, Union, Dict, List, Sequence, cast
49

eckhart's avatar
eckhart committed
50
from DHParser.toolkit import get_config_value, re
51

eckhart's avatar
eckhart committed
52
53
54
55
56
57
58
59
60
61
62
__all__ = ('RPC_Table',
           'RPC_Type',
           'JSON_Type',
           'SERVER_ERROR',
           'SERVER_OFFLINE',
           'SERVER_STARTING',
           'SERVER_ONLINE',
           'SERVER_TERMINATE',
           'Server')


63
64
RPC_Table = Dict[str, Callable]
RPC_Type = Union[RPC_Table, List[Callable], Callable]
eckhart's avatar
eckhart committed
65
JSON_Type = Union[Dict, Sequence, str, int, None]
66

eckhart's avatar
eckhart committed
67
68
RX_IS_JSON = re.compile(r'\s*(?:{|\[|"|\d|true|false|null)')

69
70
SERVER_ERROR = "COMPILER-SERVER-ERROR"

eckhart's avatar
eckhart committed
71
72
73
74
SERVER_OFFLINE = 0
SERVER_STARTING = 1
SERVER_ONLINE = 2
SERVER_TERMINATE = 3
eckhart's avatar
eckhart committed
75

eckhart's avatar
eckhart committed
76
response_test = b'''HTTP/1.1 200 OK
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
Date: Sun, 18 Oct 2009 08:56:53 GMT
Server: Apache/2.2.14 (Win32)
Last-Modified: Sat, 20 Nov 2004 07:16:26 GMT
ETag: "10000000565a5-2c-3e94b66c2e680"
Accept-Ranges: bytes
Content-Length: 60
Connection: close
Content-Type: text/html
X-Pad: avoid browser bug

<html>
<head>
</head>
<body>
<h1>Test</h1>
</body>
</html>
'''


97
98
99
100
101
102
103
104
105
106
107
class Server:
    def __init__(self, rpc_functions: RPC_Type):
        if isinstance(rpc_functions, Dict):
            self.rpc_table = cast(RPC_Table, rpc_functions)  # type: RPC_Table
        elif isinstance(rpc_functions, List):
            self.rpc_table = {}
            for func in cast(List, rpc_functions):
                self.rpc_table[func.__name__] = func
        else:
            assert isinstance(rpc_functions, Callable)
            func = cast(Callable, rpc_functions)
eckhart's avatar
eckhart committed
108
            self.rpc_table = {func.__name__: func}
eckhart's avatar
eckhart committed
109

eckhart's avatar
eckhart committed
110
111
112
        self.max_source_size = get_config_value('max_rpc_size')  #type: int
        self.stage = Value('b', SERVER_OFFLINE)  # type: Value
        self.server = None  # type: Optional[asyncio.AbstractServer]
eckhart's avatar
eckhart committed
113
114
        self.server_messages = Queue()  # type: Queue
        self.server_process = None  # type: Optional[Process]
115

116
    async def handle_compilation_request(self,
eckhart's avatar
eckhart committed
117
118
                                         reader: asyncio.StreamReader,
                                         writer: asyncio.StreamWriter):
eckhart's avatar
eckhart committed
119
120
121
122
123
        rpc_error = None  # type: Optional[Tuple[int, str]]
        json_id = 'null'  # type: Tuple[int, str]
        obj = {}          # type: Dict
        result = None     # type: JSON_Type
        raw = None        # type: JSON_Type
124

eckhart's avatar
eckhart committed
125
        data = await reader.read(self.max_source_size + 1)
Eckhart Arnold's avatar
Eckhart Arnold committed
126

127
        if len(data) > self.max_source_size:
Eckhart Arnold's avatar
Eckhart Arnold committed
128
129
130
131
132
            rpc_error = -32600, "Invaild Request: Source code too large! Only %i MB allowed" \
                                % (self.max_source_size // (1024**2))

        if rpc_error is None:
            try:
eckhart's avatar
eckhart committed
133
                raw = json.loads(data)
Eckhart Arnold's avatar
Eckhart Arnold committed
134
135
136
137
            except json.decoder.JSONDecodeError as e:
                rpc_error = -32700, "JSONDecodeError: " + str(e)

        if rpc_error is None:
eckhart's avatar
eckhart committed
138
139
140
            if isinstance(raw, Dict):
                obj = cast(Dict, raw)
                json_id = obj.get('id', 'null')
Eckhart Arnold's avatar
Eckhart Arnold committed
141
            else:
142
                rpc_error = -32700, 'Parse error: Request does not appear to be an RPC-call!?'
Eckhart Arnold's avatar
Eckhart Arnold committed
143
144
145
146

        if rpc_error is None:
            if obj.get('jsonrpc', 'unknown') != '2.0':
                rpc_error = -32600, 'Invalid Request: jsonrpc version 2.0 needed, version "' \
eckhart's avatar
eckhart committed
147
148
                                    ' "%s" found.' % obj.get('jsonrpc', b'unknown')
            elif 'method' not in obj:
149
150
151
152
153
154
155
156
157
158
159
160
161
162
                rpc_error = -32600, 'Invalid Request: No method specified.'
            elif obj['method'] not in self.rpc_table:
                rpc_error = -32601, 'Method not found: ' + str(obj['method'])
            else:
                method = self.rpc_table[obj['method']]
                params = obj['params'] if 'params' in obj else ()
                try:
                    if isinstance(params, Sequence):
                        result = method(*params)
                    elif isinstance(params, Dict):
                        result = method(**params)
                except Exception as e:
                    rpc_error = -32602, "Invalid Params: " + str(e)

Eckhart Arnold's avatar
Eckhart Arnold committed
163
164
165
166
        if rpc_error is None:
            json_result = {"jsonrpc": "2.0", "result": result, "id": json_id}
            json.dump(writer, json_result)
        else:
167
            writer.write(('{"jsonrpc": "2.0", "error": {"code": %i, "message": "%s"}, "id": %s}'
Eckhart Arnold's avatar
Eckhart Arnold committed
168
                          % (rpc_error[0], rpc_error[1], json_id)).encode())
169
170
        await writer.drain()
        writer.close()
171
172
173
        # TODO: add these lines in case a terminate signal is received, i.e. exit server coroutine
        #  gracefully.
        # self.server.cancel()
174

eckhart's avatar
eckhart committed
175
    async def serve(self, address: str = '127.0.0.1', port: int = 8888):
eckhart's avatar
eckhart committed
176
177
        self.server = cast(asyncio.base_events.Server,
                           await asyncio.start_server(self.handle_compilation_request, address, port))
178
        async with self.server:
eckhart's avatar
eckhart committed
179
180
            self.stage.value = SERVER_ONLINE
            self.server_messages.put(SERVER_ONLINE)
181
            await self.server.serve_forever()
eckhart's avatar
eckhart committed
182
            # self.server.wait_until_closed()
183

eckhart's avatar
eckhart committed
184
    def run_server(self, address: str = '127.0.0.1', port: int = 8888):
eckhart's avatar
eckhart committed
185
        self.stage.value = SERVER_STARTING
eckhart's avatar
eckhart committed
186
187
188
189
190
191
192
        # loop = asyncio.get_event_loop()
        # try:
        #     loop.run_until_complete(self.serve(address, port))
        # finally:
        #     print(type(self.server))
        #     # self.server.cancel()
        #     loop.close()
di68kap's avatar
di68kap committed
193
        asyncio.run(self.serve(address, port))
eckhart's avatar
eckhart committed
194
195

    def wait_until_server_online(self):
eckhart's avatar
eckhart committed
196
197
198
199
200
201
202
203
204
205
206
207
208
        if self.stage.value != SERVER_ONLINE:
            if self.server_messages.get() != SERVER_ONLINE:
                raise AssertionError('could not start server!?')
            assert self.stage.value == SERVER_ONLINE

    def run_as_process(self):
        self.server_process = Process(target=self.run_server)
        self.server_process.start()
        self.wait_until_server_online()

    def terminate_server_process(self):
        self.server_process.terminate()

209
    def wait_for_termination_request(self):
eckhart's avatar
eckhart committed
210
211
212
213
214
215
        assert self.server_process
        # self.wait_until_server_online()
        while self.server_messages.get() != SERVER_TERMINATE:
            pass
        self.terminate_server_process()
        self.server_process = None