# -*- coding: utf-8 -*-
# (c) 2020-2023 Martin Wendt and contributors; see https://github.com/mar10/stressor
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
"""
import itertools
import sys
import threading
import time
from collections import defaultdict
from datetime import datetime
from snazzy import emoji, green, red, yellow
from stressor import __version__
from stressor.config_manager import ConfigManager
from stressor.monitor.server import MonitorServer
from stressor.session_manager import SessionManager, User
from stressor.statistic_manager import StatisticManager
from stressor.util import (
check_arg,
format_elap,
format_num,
get_random_number,
logger,
set_console_ctrl_handler,
)
[docs]
class RunManager:
"""
Executes a run-configuration in parallel sessions.
"""
CTRL_HANDLER_SET = None
CURRENT_RUN_MANAGER = None
DEFAULT_OPTS = {
"monitor": False,
"log_summary": True,
# "dry_run": False,
}
STAGES = (
# "new",
"ready",
"running",
"done",
"waiting",
"stopping",
"stopped",
)
CHANNELS = (
"log",
"start_run",
"start_session",
"start_sequence",
"start_activity",
"end_activity",
"end_sequence",
"end_session",
"end_run",
)
activity_map = {}
def __init__(self):
self.lock = threading.RLock()
self.host_id = "h1"
self.process_id = "p1"
#: :class:`ConfigManager` used to load the configuration YAML
self.config_manager = None
self.has_hooks = False
self.has_catch_all_hooks = False
self._hooks = defaultdict(list)
#: Set this event to shut down the app
self.stop_request = threading.Event()
#: (bool): TODO: determines if a stop request is graceful or not
#: True: Finalize the current sequence, then do 'end' sequence before stopping
self.stop_request_graceful = None
#: (bool): TODO: determines if a stop request will keep the monitor running
#: True: Finalize the current sequence, then do 'end' sequence before stopping?
self.stop_request_monitor = None
self.session_list = []
#: :class:`~stressor.statistic_manager.StatisticManager` object that containscurrent execution path
self.stats = StatisticManager()
self.options = self.DEFAULT_OPTS.copy()
self.stage = "ready"
self.start_dt = None
self.start_stamp = None
self.end_dt = None
self.end_stamp = None
# register_plugins()
self.CURRENT_RUN_MANAGER = self
self.set_console_ctrl_handler()
def __str__(self):
# name = self.config_manager.path if self.config_manager else "?"
# name = self.run_config.get("name") if self.run_config else "?"
return "RunManager<{}>".format(self.stage.upper())
[docs]
@staticmethod
def set_console_ctrl_handler():
if RunManager.CTRL_HANDLER_SET is None:
RunManager.CTRL_HANDLER_SET = set_console_ctrl_handler(
RunManager._console_ctrl_handler
)
logger.info("set_console_ctrl_handler()")
[docs]
@staticmethod
def _console_ctrl_handler(ctrl):
# NOTE: seems that print/logger do not work here?
# print("_console_ctrl_handler()")
return RunManager.CURRENT_RUN_MANAGER.console_ctrl_handler(ctrl)
[docs]
def console_ctrl_handler(self, ctrl):
"""
Args:
ctrl (int): 0: CTRL_C_EVENT, 1: CTRL_BREAK_EVENT, 2: CTRL_CLOSE_EVENT
Returns:
True if handled
False if not handled, i.e. next registered handler will be called
"""
# if self.stop_request.is_set():
# print("Got Ctrl-C 2nd time: terminating...")
# logger.warning("Got Ctrl-C a 2nd time: terminating...")
# time.sleep(0.1)
# # sys.exit(2)
print("Got Ctrl-C (windows handler), terminating...", file=sys.stderr)
logger.warning("Got Ctrl-C (windows handler), terminating...")
# self.stop_request.set()
self.stop()
# return False
return True
[docs]
def set_stage(self, stage):
check_arg(stage, str, stage in self.STAGES)
logger.info("Enter stage '{}'".format(stage.upper()))
self.stage = stage
[docs]
def publish(self, channel, allow_cancel=False, *args, **kwargs):
"""Notify all subscribed handlers."""
assert channel in self.CHANNELS
result_list = []
if not self.has_hooks:
return result_list
channel_hooks = self._hooks.get(channel)
generic_hooks = self._hooks.get("*")
if channel_hooks:
if generic_hooks:
hooks = itertools.chain(channel_hooks, generic_hooks)
else:
hooks = channel_hooks
elif generic_hooks:
hooks = generic_hooks
for handler in hooks:
res = handler(channel, *args, **kwargs)
if allow_cancel and res is False:
return False
result_list.append(res)
return result_list
[docs]
def subscribe(self, channel, handler):
self.has_hooks = True
if channel == "*":
self.has_catch_all_hooks = True
else:
assert channel in (self.CHANNELS)
assert callable(handler)
self._hooks[channel].append(handler)
[docs]
def has_errors(self, or_warnings=False):
return self.stats.has_errors()
[docs]
def get_cli_summary(self):
cm = self.config_manager
lines = []
run_time = self.end_stamp - self.start_stamp
user_count = len(self.session_list)
has_errors = self.has_errors()
ap = lines.append
col = red if has_errors else green
horz_line = col("=-" * 38 + "=")
ap("Result Summary:")
ap(horz_line)
ap("Stressor scenario '{}' finished.".format(cm.name))
ap(" Tag: '{}'".format(cm.config.get("tag", "n.a.")))
ap(" Base URL: {}".format(cm.config.get("base_url", "")))
ap(" Start: {}".format(self.start_dt.strftime("%Y-%m-%d %H:%M:%S")))
ap(" End: {}".format(self.end_dt.strftime("%Y-%m-%d %H:%M:%S")))
ap(
"Run time {}, net: {}.".format(
format_elap(run_time, high_prec=True),
format_elap(self.stats["net_act_time"], high_prec=True),
)
)
ap(
"Executed {:,} activities in {:,} sequences, using {:,} parallel sessions.".format(
self.stats["act_count"],
self.stats["seq_count"],
user_count,
)
)
if run_time and self.stats["seq_count"]:
ap(
"Sequence duration: {} average.".format(
format_elap(run_time / self.stats["seq_count"], high_prec=True)
)
)
if run_time:
ap(
" rate: {} sequences per minute (per user: {}).".format(
format_num(60.0 * self.stats["seq_count"] / run_time),
format_num(
60.0 * self.stats["seq_count"] / (run_time * user_count)
),
)
)
ap(
"Activity rate: {} activities per second (per user: {}).".format(
format_num(self.stats["act_count"] / run_time),
format_num(self.stats["act_count"] / (run_time * user_count)),
)
)
# --- List of all activities that are marked `monitor: true`
if self.stats["monitored"]:
print(self.stats["monitored"])
ap("{} monitored activities:".format(len(self.stats["monitored"])))
for path, info in self.stats["monitored"].items():
errors = info.get("errors")
ap(" - {}".format(path))
if not info:
ap(" n: 0, min: n.a., avg: n.a., max: n.a.")
continue
ap(
" n: {:,}, min: {}, avg: {}, max: {}{}".format(
info["act_count"],
format_elap(info["act_time_min"], high_prec=True),
format_elap(info["act_time_avg"], high_prec=True),
format_elap(info["act_time_max"], high_prec=True),
red(", {} errors".format(errors)) if errors else "",
)
)
if has_errors:
pics = emoji(" 💥 💔 💥", "")
ap(
red(
"Result: ERROR, found {:,} errors and {:,} warnings.".format(
self.stats["errors"],
self.stats["warnings"],
)
+ pics
)
)
if self.stats.stats["run_limit_reached"]:
ap(
yellow(
"Some activities where skipped due to the `max-errors` "
" or `max-time` limit."
)
)
else:
pics = emoji(" ✨ 🍰 ✨", "")
ap(green("Result: Ok." + pics))
ap(horz_line)
return "\n".join(lines)
[docs]
def get_status_info(self):
cm = self.config_manager
stats_info = self.stats.get_monitor_info(cm.config_all)
res = {
"name": cm.name,
"scenarioDetails": cm.get("config.details", "n.a."),
"tag": cm.get("config.tag", "n.a."),
"stage": self.stage,
"stageDisplay": "done" if self.stage == "waiting" else self.stage,
"hasErrors": self.has_errors(),
"startTimeStr": "{}".format(self.start_dt.strftime("%Y-%m-%d %H:%M:%S")),
"baseUrl": cm.get("config.base_url"),
"sessionCount": self.stats["sess_count"],
"sessionsRunning": self.stats["sess_running"],
"stats": stats_info,
"version": __version__,
}
if self.end_dt:
elap = self.end_dt - self.start_dt
res["endTimeStr"] = "{} ({})".format(
self.end_dt.strftime("%Y-%m-%d %H:%M:%S"),
format_elap(elap.total_seconds()),
)
else:
elap = datetime.now() - self.start_dt
res["endTimeStr"] = "(running for {}...)".format(
format_elap(elap.total_seconds())
)
return res
[docs]
def log_info(self, *args, **kwargs):
self.publish("log", level="info", *args, **kwargs)
[docs]
def load_config(self, run_config_file):
"""Load configuration file and set shortcuts."""
cr = ConfigManager(self.stats)
cr.read(run_config_file, load_files=True)
self.config_manager = cr
# self.run_config = cr.run_config
logger.info("Successfully compiled configuration {}.".format(cr.path))
[docs]
def _run_one(self, session_manager):
"""Run inside a separate thread."""
try:
session_manager.run()
# We don't need to print results if only one session was run, since
# it is also part of the global stats:
# rc = self.run_config
# if not rc.get("force_single") and rc["sessions"]["count"] > 1:
# logger.info(
# "Results for {}:\n{}".format(
# session_manager, session_manager.stats.format_result()
# )
# )
except KeyboardInterrupt:
logger.exception("Session thread received Ctrl-C")
self.stats.report_error(None, None, None, "KeyboardInterrupt")
self.stop_request.set()
# self.stats.inc("errors")
except Exception as e:
logger.exception("Session thread raised exception")
self.stats.report_error(None, None, None, e)
# self.stats.inc("errors")
# raise
return
[docs]
def run_in_threads(self, user_list, context):
self.publish("start_run", run_manager=self)
self.stop_request.clear()
thread_list = []
self.session_list = []
for i, user in enumerate(user_list, 1):
name = "t{:02}".format(i)
sess = SessionManager(self, context, name, user)
self.session_list.append(sess)
t = threading.Thread(name=name, target=self._run_one, args=[sess])
t.setDaemon(True) # Required to make Ctrl-C work
thread_list.append(t)
logger.info("Starting {} session workers...".format(len(thread_list)))
self.set_stage("running")
self.stats.report_start(None, None, None)
ramp_up_delay = self.config_manager.sessions.get("ramp_up_delay")
start_run = time.monotonic()
for i, t in enumerate(thread_list):
if ramp_up_delay and i > 1:
delay = get_random_number(ramp_up_delay)
logger.info(
"Ramp-up delay for t{:02}: {:.2f} seconds...".format(i, delay)
)
time.sleep(delay)
t.start()
logger.important(
"All {} sessions running, waiting for them to terminate...".format(
len(thread_list)
)
)
for t in thread_list:
t.join()
self.set_stage("done")
elap = time.monotonic() - start_run
# self.stats.add_timing("run", elap)
self.stats.report_end(None, None, None)
self.publish("end_run", run_manager=self, elap=elap)
logger.debug("Results for {}:\n{}".format(self, self.stats.format_result()))
return not self.has_errors()
[docs]
def run(self, options, extra_context=None):
"""Run the current
Args:
options (dict): see RunManager.DEFAULT_OPTS
extra_context (dict, optional):
Returns:
(int) Exit code 0 if no errors occurred
"""
check_arg(options, dict)
check_arg(extra_context, dict, or_none=True)
self.options.update(options)
if extra_context:
self.config_manager.update_config(extra_context)
context = self.config_manager.context
sessions = self.config_manager.sessions
count = int(sessions.get("count", 1))
if count > 1 and self.config_manager.config.get("force_single"):
logger.info("force_single: restricting sessions count to one.")
count = 1
# Construct a `User` with at least 'name', 'password', and optional
# custom attributes
user_list = []
for user_dict in sessions["users"]:
user = User(**user_dict)
user_list.append(user)
# We have N users and want `count` sessions: re-use round-robin
user_list = itertools.islice(itertools.cycle(user_list), 0, count)
user_list = list(user_list)
monitor = None
if self.options.get("monitor"):
monitor = MonitorServer(self)
monitor.start()
time.sleep(0.5)
monitor.open_browser()
self.start_stamp = time.monotonic()
self.start_dt = datetime.now()
self.end_dt = None
self.end_stamp = None
try:
try:
res = False
res = self.run_in_threads(user_list, context)
except KeyboardInterrupt:
# if not self.stop_request.is_set():
logger.warning("Caught Ctrl-C: terminating...")
self.stop()
finally:
self.end_dt = datetime.now()
self.end_stamp = time.monotonic()
if self.options.get("log_summary", True):
logger.important(self.get_cli_summary())
if monitor:
self.set_stage("waiting")
logger.important("Waiting for monitor... Press Ctrl+C to quit.")
self.stop_request.wait()
finally:
if monitor:
monitor.shutdown()
# print("RES", res, self.has_errors(), self.stats.format_result())
self.set_stage("stopped")
return res
[docs]
def stop(self, graceful=2):
""""""
# logger.info("Stop request received")
# TODO: set errors += 1 if we interrupt a running stage
self.set_stage("stopping")
self.stop_request.set()
return True
[docs]
def get_run_time(self):
return time.monotonic() - self.start_stamp