Source code for stressor.monitor.server

# (c) 2020-2024 Martin Wendt and contributors; see
# Licensed under the MIT license:
import io
import json
import logging
import os
import socketserver
import webbrowser
from http.server import HTTPStatus, SimpleHTTPRequestHandler
from threading import Thread
from urllib import parse

from stressor import __version__

# from stressor.util import logger

logger = logging.getLogger("stressor.monitor")

[docs] class Handler(SimpleHTTPRequestHandler): server_version = ( "stressor/" + __version__ + " " + SimpleHTTPRequestHandler.server_version ) # Custom attributes, set by `MonitorServer`: DIRECTORY = None run_manager = None def __init__(self, *args, **kwargs): super().__init__(*args, directory=self.DIRECTORY, **kwargs)
[docs] def log_request(self, code="-", size="-"): # Overide base implementation (writing to stderr) if isinstance(code, HTTPStatus) and not (200 <= code.value < 400): logger.warning('"%s" %s %s', self.requestline, str(code), str(size)) else: logger.debug('"%s" %s %s', self.requestline, str(code), str(size))
[docs] def log_error(self, format, *args): # Overide base implementation (writing to stderr) logger.error(format, *args)
[docs] def log_message(self, format, *args): # Overide base implementation (writing to stderr) logger.debug(format, *args)
[docs] def _return_json(self, body, status=HTTPStatus.OK): if not isinstance(body, str): body = json.dumps(body) encoded = body.encode("utf8", "surrogateescape") f = io.BytesIO() f.write(encoded) # print(encoded) self.send_response(status) self.send_header("Content-type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() try: self.copyfile(f, self.wfile) finally: f.close() return f
[docs] def on_stopManager(self, args): res = self.run_manager.stop() return self._return_json(res)
[docs] def on_getStats(self, args): res = self.run_manager.get_status_info() return self._return_json(res)
[docs] def on_getErrorInfo(self, args): res = self.run_manager.stats.get_error_info(args) return self._return_json(res)
[docs] def do_GET(self): handler_name = self.path.strip("/") handler_name, _sep, _args = handler_name.partition("?") args = dict(parse.parse_qsl(parse.urlsplit(self.path).query)) handler = getattr(self, "on_" + handler_name, None) if callable(handler): try: return handler(args) except Exception as e: logger.exception(handler_name) return self._return_json( {"fault": repr(e)}, HTTPStatus.INTERNAL_SERVER_ERROR ) return SimpleHTTPRequestHandler.do_GET(self)
[docs] class MonitorServer(Thread): """ Run a web server in a separate thread, so it does not block """ def __init__(self, run_manager, bind="", port=8081): super().__init__(name="stressor.monitor", daemon=None) Handler.DIRECTORY = os.path.join(os.path.dirname(__file__), "htdocs") Handler.run_manager = run_manager self.run_manager = run_manager self.bind = bind self.port = port self.httpd = None
[docs] def run(self): with socketserver.TCPServer((self.bind, self.port), Handler) as httpd: self.httpd = httpd "Monitor serving at http://{}:{}...".format( self.bind or "localhost", self.port ) ) httpd.serve_forever() self.httpd = None"Monitor server stopped.")
[docs] def shutdown(self): if self.httpd: self.httpd.shutdown()
[docs] def open_browser(self): assert self.bind == "" monitor_url = f"http://localhost:{self.port}/" # monitor_url = "{}/".format(self.port)"Opening web browser at {monitor_url}") webbrowser.open_new_tab(monitor_url)