Commit 020af6fb authored by eckhart's avatar eckhart

- testing_server.py: spawn server test (still broken)

parent 9771686c
......@@ -147,7 +147,7 @@ def asyncio_run(coroutine: Coroutine, loop=None) -> Any:
else:
if loop is None:
myloop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.set_event_loop(myloop)
else:
myloop = loop
result = myloop.run_until_complete(coroutine)
......
......@@ -38,41 +38,41 @@ class TestDHParserCommandLineTool:
def setup(self):
self.cwd = os.getcwd()
os.chdir(scriptdir)
if not os.path.exists('testdata'):
os.mkdir('testdata')
if not os.path.exists('test_dhparser_data'):
os.mkdir('test_dhparser_data')
self.nulldevice = " >/dev/null" if platform.system() != "Windows" else " > NUL"
self.python = 'python3 ' if system('python3 -V' + self.nulldevice) == 0 else 'python '
def teardown(self):
if os.path.exists('testdata/neu/neuServer.py'):
system(self.python + 'testdata/neu/neuServer.py --stopserver' + self.nulldevice)
if os.path.exists('testdata/neu') and os.path.isdir('testdata/neu'):
shutil.rmtree('testdata/neu')
if os.path.exists('testdata') and not os.listdir('testdata'):
os.rmdir('testdata')
if os.path.exists('test_dhparser_data/neu/neuServer.py'):
system(self.python + 'test_dhparser_data/neu/neuServer.py --stopserver' + self.nulldevice)
if os.path.exists('test_dhparser_data/neu') and os.path.isdir('test_dhparser_data/neu'):
shutil.rmtree('test_dhparser_data/neu')
if os.path.exists('test_dhparser_data') and not os.listdir('test_dhparser_data'):
os.rmdir('test_dhparser_data')
os.chdir(self.cwd)
def test_dhparser(self):
# test compiler creation and execution
system(self.python + '../DHParser/scripts/dhparser.py testdata/neu ' + self.nulldevice)
system(self.python + 'testdata/neu/tst_neu_grammar.py ' + self.nulldevice)
system(self.python + 'testdata/neu/neuCompiler.py testdata/neu/example.dsl '
'>testdata/neu/example.xml')
with open('testdata/neu/example.xml', 'r', encoding='utf-8') as f:
system(self.python + '../DHParser/scripts/dhparser.py test_dhparser_data/neu ' + self.nulldevice)
system(self.python + 'test_dhparser_data/neu/tst_neu_grammar.py ' + self.nulldevice)
system(self.python + 'test_dhparser_data/neu/neuCompiler.py test_dhparser_data/neu/example.dsl '
'>test_dhparser_data/neu/example.xml')
with open('test_dhparser_data/neu/example.xml', 'r', encoding='utf-8') as f:
xml = f.read()
assert xml.find('<document>') >= 0, xml
os.remove('testdata/neu/neuCompiler.py')
os.remove('testdata/neu/example.xml')
os.remove('test_dhparser_data/neu/neuCompiler.py')
os.remove('test_dhparser_data/neu/example.xml')
# test server
system(self.python + 'testdata/neu/neuServer.py --stopserver' + self.nulldevice)
system(self.python + 'testdata/neu/neuServer.py testdata/neu/example.dsl '
'>testdata/neu/example.xml')
with open('testdata/neu/example.xml', 'r', encoding='utf-8') as f:
system(self.python + 'test_dhparser_data/neu/neuServer.py --stopserver' + self.nulldevice)
system(self.python + 'test_dhparser_data/neu/neuServer.py test_dhparser_data/neu/example.dsl '
'>test_dhparser_data/neu/example.xml')
with open('test_dhparser_data/neu/example.xml', 'r', encoding='utf-8') as f:
json = f.read()
assert json.find('document') >= 0, json
system(self.python + 'testdata/neu/neuServer.py testdata/neu/example.dsl ' + self.nulldevice)
system(self.python + 'testdata/neu/neuServer.py --stopserver' + self.nulldevice)
system(self.python + 'test_dhparser_data/neu/neuServer.py test_dhparser_data/neu/example.dsl ' + self.nulldevice)
system(self.python + 'test_dhparser_data/neu/neuServer.py --stopserver' + self.nulldevice)
pass
......
......@@ -23,12 +23,14 @@ limitations under the License.
import asyncio
import json
import os
import subprocess
import sys
import time
sys.path.extend(['../', './'])
from DHParser.server import Server, STOP_SERVER_REQUEST, IDENTIFY_REQUEST, SERVER_OFFLINE, asyncio_run
from DHParser.toolkit import concurrent_ident
scriptdir = os.path.dirname(os.path.realpath(__file__))
......@@ -175,6 +177,66 @@ class TestServer:
cs.terminate_server()
RUN_SERVER_SCRIPT = """
def dummy(s: str) -> str:
return s
def run_server(host, port):
from DHParser.server import Server
print('Starting server on %s:%i' % (host, port))
server = Server(dummy)
server.run_server(host, port)
if __name__ == '__main__':
run_server('127.0.0.1', 8888)
"""
def asyncio_run(coroutine):
"""Backward compatible version of Pyhon 3.7's `asyncio.run()`"""
if sys.version_info >= (3, 7):
return asyncio.run(coroutine)
else:
loop = asyncio.get_event_loop()
return loop.run_until_complete(coroutine)
class TestSpawning:
"""Tests spawning a server by starting a script via subprocess.Popen."""
def setup(self):
self.tmpdir = 'tmp_' + concurrent_ident()
os.mkdir(self.tmpdir)
def teardown(self):
for fname in os.listdir(self.tmpdir):
os.remove(os.path.join(self.tmpdir, fname))
os.rmdir(self.tmpdir)
def test_spawn(self):
scriptname = os.path.join(self.tmpdir, 'spawn_server.py')
with open(scriptname, 'w') as f:
f.write(RUN_SERVER_SCRIPT)
subprocess.Popen(['python', scriptname])
countdown = 20
delay = 0.05
connected = False
reader, writer = None, None
while countdown > 0:
try:
reader, writer = asyncio_run(asyncio.open_connection('127.0.0.1', 8888))
countdown = 0
connected = True
except ConnectionRefusedError:
time.sleep(delay)
delay += 0.0
countdown -= 1
if connected:
writer.write(IDENTIFY_REQUEST.encode())
data = asyncio_run(reader.read(500))
writer.close()
print(data.decode())
if __name__ == "__main__":
from DHParser.testing import runner
runner("", globals())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment