# (c) 2020-2024 Martin Wendt and contributors; see https://github.com/mar10/stressor
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
"""
import re
import time
from copy import deepcopy
import requests
from snazzy import red, yellow
from stressor.config_manager import replace_var_macros
from stressor.context_stack import ContextStack
from stressor.plugins.base import ActivityAssertionError
from stressor.util import (
NO_DEFAULT,
StressorError,
check_arg,
get_dict_attr,
logger,
shorten_string,
)
[docs]
class StoppedError(StressorError):
"""Raised when an activity stops due because the `stop_request` is set."""
[docs]
class SkippedError(StressorError):
"""Raised when an activity is skipped due to `max_errors` or `max_time`."""
[docs]
class User:
def __init__(self, name, password, **kwargs):
self.name = name
self.password = password
for arg_name, arg_val in kwargs.items():
assert type(arg_name) in (int, float, str)
setattr(self, arg_name, arg_val)
return
def __str__(self):
return f"User<{self.name}>"
# Provide nicer display for pprint(), etc.
__repr__ = __str__
@property
def auth(self):
"""Return (name, password) tuple or None."""
if self.password is None:
return None
return (self.name, self.password)
[docs]
class SessionHelper:
"""Passed to script activities."""
def __init__(self, session):
self.__session = session
@property
def browser(self):
return self.__session.browser_session
def __repr__(self):
return f"SessionHelper<{self.__session.session_id}>"
[docs]
class SessionManager:
"""
Run a scenario in a single session.
"""
# #: (float)
# DEFAULT_REQUEST_TIMEOUT = 10.0
def __init__(self, run_manager, context, session_id, user):
# check_arg(run_manager, RunManager)
check_arg(context, dict)
check_arg(session_id, str)
check_arg(user, User, or_none=True)
#: The :class:`RunManager` object that holds global settings and definitions
self.run_manager = run_manager
config = run_manager.config_manager.config
# (dict) Global variables for this session. Initialized from the
# run configuration, but not shared between sessions.
# (`self.context` is accessible by the respective property below.)
context = context.copy()
#: (str) Unique ID string for this session
self.session_id = session_id
#: The :class:`User` object that is assigned to this session
self.user = user or User("anonymous", "")
#: (dict) Copy of `run_config.sessions` configuration
self.sessions = run_manager.config_manager.sessions.copy()
#: (dict) Activities can store per-session data here.
#: Note that the activity objects are instintiated only once and shared
#: by all sessions.
self.data = {}
#: (bool) True: only simulate activities
self.dry_run = bool(context.get("dry_run"))
#: (int) Verbosity 0..5
self.verbose = context.get("verbose", 3)
#: (:class:`threading.Event`)
self.stop_request = run_manager.stop_request
# Set some default entries in context dict
# context.setdefault("timeout", self.DEFAULT_REQUEST_TIMEOUT)
context.setdefault("session_id", self.session_id)
context.setdefault("user", self.user)
#: The :class:`~stressor.context_stack.ContextStack` object that reflects the current execution path
self.context_stack = ContextStack(run_manager.host_id, context)
self.context_stack.push(run_manager.process_id)
self.context_stack.push(session_id)
#: :class:`~stressor.statistic_manager.StatisticManager` object that containscurrent execution path
self.stats = run_manager.stats
# Lazy initialization using a property
self._browser_session = None
#: (int) Stop session if global error count > X
#: Passing `--max-errors` will override this.
self.max_errors = int(config.get("max_errors", 0))
#: (float) Stop session if total run time > X seconds.
#: Passing `--max-time` will override this.
self.max_time = float(config.get("max_time", 0.0))
self._cancelled_seq = None
# Used by StatisticsManager
self.pending_sequence = None
self.sequence_start = None
self.pending_activity = None
self.activity_start = None
self.stats.register_session(self)
def __str__(self):
return f"SessionManager<{self.session_id}>"
# Provide nicer display for pprint(), etc.
__repr__ = __str__
[docs]
def publish(self, channel, *args, **kwargs):
kwargs["session_id"] = self.session_id
return self.run_manager.publish(channel, *args, **kwargs)
[docs]
def _evaluate_macros(self, kwargs, context):
replace_var_macros(kwargs, context)
return kwargs
[docs]
def make_session_helper(self):
"""Return a :class:`SessionHelper` instance for this session."""
res = SessionHelper(self)
return res
@property
def browser_session(self):
"""Return a ``requests.Session`` instance for this session."""
if self._browser_session is None:
self._browser_session = requests.Session()
return self._browser_session
@property
def context(self):
return self.context_stack.context
@property
def sess_stats(self):
sess_stats = self.stats["sessions"][self.session_id]
return sess_stats
[docs]
def get_context(self, dotted_key=None, default=NO_DEFAULT):
res = self.context_stack.get_attr(dotted_key)
return res
[docs]
def get_config(self, dotted_key=None, default=NO_DEFAULT):
res = self.run_manager.config_manager.get(dotted_key, default)
return res
[docs]
def log_info(self, *args):
logger.info(self.session_id, *args)
# self.publish("log", self.session_id, *args, level="info")
[docs]
def has_errors(self, or_warnings=False):
return self.sess_stats["errors"] > 0
[docs]
def check_run_limits(self, seq_name):
"""Check if current time or number of errors exceeds the configured limits.
Args:
seq_name (str):
current sequence name. Used to determine if a stopping session
should still execute the 'end' sequence if the limit was reached
during a main sequence.
Returns:
(bool) false if the current operation should be skipped.
"""
cs = self._cancelled_seq
err_limit_reached = (
# Compare max_errors against global error count
self.max_errors
and self.stats.error_count() >= self.max_errors
)
run_time = self.run_manager.get_run_time()
time_limit_reached = self.max_time and run_time > self.max_time
if not cs and (err_limit_reached or time_limit_reached):
# We just hit a limit: issue a warning and take a note of the sequence
self._cancelled_seq = seq_name
# self.stats.stats["run_limit_reached"] = True
if err_limit_reached:
msg = f"Reached max. error limit of {self.max_errors}: stopping..."
elif time_limit_reached:
msg = f"Reached max. run time limit of {self.max_time}: stopping..."
self.stats.report_limit_violation(msg)
logger.warning(yellow(msg))
return False
elif cs in ("init", "end"):
# The limit was reached inside the init or end sequence: always skip
return False
elif cs:
# The limit was reached inside a main sequence: allow to run the end
# sequence but skip all others
return seq_name == "end"
return True
[docs]
def report_activity_start(self, sequence, activity):
"""Called by session runner before activities is executed."""
path = self.context_stack.path()
self.stats.report_start(self, sequence, activity, path=path)
logger.info("{} {}".format("DRY-RUN" if self.dry_run else "Execute", path))
[docs]
def report_activity_error(self, sequence, activity, activity_args, exc):
"""Called session runner when activity `execute()` or assertions raise an error."""
# self.stats.inc("errors")
if isinstance(exc, SkippedError):
logger.warning(yellow(f"Skipped {activity}"))
self.pending_activity = None
return
# Create a copy of the current context, so we can shorten values
context = self.context_stack.context.copy()
context["last_result"] = shorten_string(context.get("last_result"), 500, 100)
msg = []
# msg.append("{} {}: {!r}:".format(self.context_stack, activity, exc))
# msg.append("{!r}:".format(exc))
msg.append(f"{exc!r}:")
if isinstance(exc, ActivityAssertionError):
msg.append("Failed assertions:")
for err in exc.assertion_list:
msg.append(f" - {err}")
msg.append(f"Execution path: {self.context_stack}")
msg.append(f"Activity: {activity}")
msg.append(f"Activity args: {activity_args}")
msg.append(f"Context: {context}")
msg = "\n ".join(msg)
logger.error(red(msg))
self.stats.report_error(self, sequence, activity, error=msg)
return
[docs]
def report_activity_result(self, sequence, activity, activity_args, result, elap):
"""Called session runner when activity `execute()` completes."""
self.stats.report_end(self, sequence, activity)
[docs]
def _process_activity_result(self, activity, activity_args, result, elap):
"""Perform common checks.
Raises:
ActivityAssertionError
"""
context = self.context_stack.context
errors = []
# warnings = []
arg = float(activity_args.get("assert_max_time", 0))
if arg and elap > arg:
errors.append(
f"Execution time limit of {arg} seconds exceeded: {elap:.3} sec."
)
arg = activity_args.get("assert_match")
if arg:
text = str(result)
# Note: use re.search (not .match)!
if not re.search(arg, text, re.MULTILINE):
errors.append(
f"Result does not match `{arg}`: {shorten_string(text, 500, 100)!r}"
)
arg = activity_args.get("store_json")
if arg:
for var_name, key_path in arg.items():
try:
val = get_dict_attr(result, key_path)
context[var_name] = val
except Exception:
errors.append(
f"store_json could not find `{key_path}` in activity result {result!r}"
)
if errors:
raise ActivityAssertionError(errors)
return
[docs]
def run_sequence(self, seq_name, sequence):
stack = self.context_stack
context = stack.context
self.publish(
"start_sequence",
session=self,
sequence=sequence,
path=stack,
)
self.stats.report_start(self, seq_name, None)
start_sequence = time.monotonic()
for act_idx, activity_args in enumerate(sequence, 1):
# activity_args["activity"] is an instance of ActivityBase that
# we want to re-use it for every session.
# The rest of activity_args is copied, so session data is separated
activity = activity_args["activity"]
activity_args = deepcopy(activity_args)
activity_args.pop("activity")
# Add activity info to path
# Note: `get_info()` is not as detailed as it could, since we don't
# pass the expanded args here. We set it anyway, so we have a valid
# stack in case `_evaluate_macros()` blows.
with stack.enter(f"#{act_idx:02}-{activity.get_info(session=self)}"):
expanded_args = self._evaluate_macros(activity_args, context)
# Let activity do internal calculations, that might be used by
# the follwing call to `get_info()`
activity.prepare_execute(self, expanded_args)
# Enhance the path info with expanded args
stack.set_last_part(
activity.get_info(expanded_args=expanded_args, session=self)
)
error = None
result = None
self.publish(
"start_activity",
session=self,
sequence=sequence,
activity=activity,
expanded_args=expanded_args,
context=context,
path=stack,
)
start_activity = time.monotonic()
self.report_activity_start(seq_name, activity)
try:
if self.stop_request.is_set():
raise StoppedError
if not self.check_run_limits(seq_name):
raise SkippedError
result = activity.execute(self, **expanded_args)
context["last_result"] = result
# Evaluate standard `assert_...` and `store_...` clauses:
elap = time.monotonic() - start_activity
self._process_activity_result(
activity,
activity_args,
result,
elap,
)
self.report_activity_result(
seq_name,
activity,
activity_args,
result,
elap,
)
except (Exception, KeyboardInterrupt) as e:
if isinstance(e, KeyboardInterrupt):
self.stop_request.set()
error = e
self.report_activity_error(seq_name, activity, activity_args, e)
if not isinstance(e, (KeyboardInterrupt, StressorError)):
logger.exception("")
# return False
finally:
elap = time.monotonic() - start_activity
self.publish(
"end_activity",
session=self,
sequence=sequence,
path=stack,
activity=activity,
result=result,
error=error,
elap=elap,
context=context,
)
elap = time.monotonic() - start_sequence
self.stats.report_end(self, seq_name, None)
self.publish(
"end_sequence",
session=self,
sequence=sequence,
path=stack,
elap=elap,
)
context["last_result"] = None
return not self.has_errors()
[docs]
def run(self):
stack = self.context_stack
rm = self.run_manager
config_manager = rm.config_manager
config = config_manager.config
sequences = config_manager.sequences
scenario = config_manager.scenario
sessions = config_manager.sessions
session_duration = float(sessions.get("duration", 0.0))
self.publish("start_session", session=self)
self.stats.report_start(self, None, None)
start_session = time.monotonic()
skip_all = False
skip_all_but_end = False
for seq_idx, seq_def in enumerate(scenario, 1):
seq_name = seq_def["sequence"]
if skip_all or (skip_all_but_end and seq_name != "end"):
logger.warning(f"Skipping sequence '{seq_name}'.")
continue
sequence = sequences.get(seq_name)
loop_repeat = int(seq_def.get("repeat", 0))
loop_duration = float(seq_def.get("duration", 0.0))
start_seq_loop = time.monotonic()
loop_idx = 0
while True:
loop_idx += 1
if not self.check_run_limits(seq_name=seq_name):
skip_all_but_end = True
break
# One single pass by default
if not loop_repeat and not loop_duration and loop_idx > 1:
break
# `Sequence repeat: COUNT`:
if loop_repeat and loop_idx > loop_repeat:
break
# `--single`:
if loop_idx > 1 and config.get("force_single"):
logger.warning(
"force_single: sequence '{}' skipping remaining {} loops.".format(
seq_name, loop_repeat - 1 if loop_repeat else ""
)
)
break
now = time.monotonic()
# `Sequence duration: SECS`:
if loop_duration > 0 and now > (start_seq_loop + loop_duration):
logger.info(
f"Stopping sequence '{seq_name}' loop after {loop_duration} sec."
)
break
# `Session duration: SECS` (but run 'end' sequence):
elif (
seq_name != "end"
and session_duration > 0
and now > (start_session + session_duration)
):
logger.info(
f"Stopping scenario '{seq_name}' loop after {session_duration} sec."
)
skip_all_but_end = True
break
with stack.enter(f"#{seq_idx:02}-{seq_name}@{loop_idx}"):
is_ok = self.run_sequence(seq_name, sequence)
if seq_name == "init" and not is_ok:
logger.error(
"Stopping scenario due to an error in the 'init' sequence."
)
skip_all = True
break
elif self.stop_request.is_set():
logger.error("Stopping scenario due to a stop request.")
# TODO: a second 'ctrl-c' should not be so graceful
skip_all_but_end = True
break
# self.stats.report_end(self, seq_name, None)
elap = time.monotonic() - start_session
self.stats.report_end(self, None, None)
self.publish("end_session", session=self, elap=elap)
# if self._cancelled_seq:
# return False
return not self.has_errors()