Source code for stressor.monitor.server

# -*- coding: utf-8 -*-
# (c) 2020-2023 Martin Wendt and contributors; see https://github.com/mar10/stressor
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
"""
import io
import json
import logging
import os
import socketserver
import webbrowser
from http.server import HTTPStatus, SimpleHTTPRequestHandler
from threading import Thread
from urllib import parse

from stressor import __version__

# from stressor.util import logger


logger = logging.getLogger("stressor.monitor")


[docs] class Handler(SimpleHTTPRequestHandler): server_version = ( "stressor/" + __version__ + " " + SimpleHTTPRequestHandler.server_version ) # Custom attributes, set by `MonitorServer`: DIRECTORY = None run_manager = None def __init__(self, *args, **kwargs): super().__init__(*args, directory=self.DIRECTORY, **kwargs)
[docs] def log_request(self, code="-", size="-"): # Overide base implementation (writing to stderr) if isinstance(code, HTTPStatus) and not (200 <= code.value < 400): logger.warning('"%s" %s %s', self.requestline, str(code), str(size)) else: logger.debug('"%s" %s %s', self.requestline, str(code), str(size))
[docs] def log_error(self, format, *args): # Overide base implementation (writing to stderr) logger.error(format, *args)
[docs] def log_message(self, format, *args): # Overide base implementation (writing to stderr) logger.debug(format, *args)
[docs] def _return_json(self, body, status=HTTPStatus.OK): if not isinstance(body, str): body = json.dumps(body) encoded = body.encode("utf8", "surrogateescape") f = io.BytesIO() f.write(encoded) f.seek(0) # print(encoded) self.send_response(status) self.send_header("Content-type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() try: self.copyfile(f, self.wfile) finally: f.close() return f
[docs] def on_stopManager(self, args): res = self.run_manager.stop() return self._return_json(res)
[docs] def on_getStats(self, args): res = self.run_manager.get_status_info() return self._return_json(res)
[docs] def on_getErrorInfo(self, args): res = self.run_manager.stats.get_error_info(args) return self._return_json(res)
[docs] def do_GET(self): handler_name = self.path.strip("/") handler_name, _sep, _args = handler_name.partition("?") args = dict(parse.parse_qsl(parse.urlsplit(self.path).query)) handler = getattr(self, "on_" + handler_name, None) if callable(handler): try: return handler(args) except Exception as e: logger.exception(handler_name) return self._return_json( {"fault": repr(e)}, HTTPStatus.INTERNAL_SERVER_ERROR ) return SimpleHTTPRequestHandler.do_GET(self)
[docs] class MonitorServer(Thread): """ Run a web server in a separate thread, so it does not block """ def __init__(self, run_manager, bind="", port=8081): super().__init__(name="stressor.monitor", daemon=None) Handler.DIRECTORY = os.path.join(os.path.dirname(__file__), "htdocs") Handler.run_manager = run_manager self.run_manager = run_manager self.bind = bind self.port = port self.httpd = None
[docs] def run(self): with socketserver.TCPServer((self.bind, self.port), Handler) as httpd: self.httpd = httpd logger.info( "Monitor serving at http://{}:{}...".format( self.bind or "localhost", self.port ) ) httpd.serve_forever() self.httpd = None logger.info("Monitor server stopped.")
[docs] def shutdown(self): if self.httpd: self.httpd.shutdown()
[docs] def open_browser(self): assert self.bind == "" monitor_url = f"http://localhost:{self.port}/" # monitor_url = "http://127.0.0.1:{}/".format(self.port) logger.info(f"Opening web browser at {monitor_url}") webbrowser.open_new_tab(monitor_url)