# (c) 2020-2024 Martin Wendt and contributors; see https://github.com/mar10/stressor
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
"""
import os
import re
import yaml
from stressor.plugin_manager import PluginManager
from stressor.plugins.base import ActivityBase, ActivityCompileError
from stressor.util import (
NO_DEFAULT,
PathStack,
StressorError,
assert_always,
check_arg,
get_dict_attr,
logger,
)
VAR_MACRO_REX = re.compile(r"\$\(\s*(\w[\w.:]*)\s*\)")
GENERIC_MACRO_REX = re.compile(r"\$\w+.*\(.*\).*")
[docs]
class ConfigurationError(StressorError):
""""""
[docs]
def replace_var_macros(value, context):
"""
Replace all macros of type `$(CONTEXT.VAR.NAME)`.
"""
stack = PathStack()
def repl(value, context, parent, parent_key):
with stack.enter(str(parent_key or "?")):
if isinstance(value, dict):
for key, sub_val in value.items():
repl(sub_val, context, value, key)
elif isinstance(value, (list, tuple)):
for idx, elem in enumerate(value):
repl(elem, context, value, idx)
elif isinstance(value, str):
org_value = value
if "$" in value and str(stack) == "/?/script":
# Don't replace macros inside RunActivity scripts
logger.debug("Not replacing macros inside `script`s.")
return value
while "$" in value:
found_one = False
temp_val = value
for match in VAR_MACRO_REX.finditer(temp_val):
found_one = True
# print(match, match.groups())
macro, var_name = match.group(), match.groups()[0]
# resolve dotted names:
# var_value = context[var_name]
try:
var_value = get_dict_attr(context, var_name)
if value.strip() == macro:
# Replace macro string with resolved int, float, or str
value = var_value
break
# value contains a macro but also prefix or suffix.
# Cast macro-result to string and check for more macros
value = value.replace(macro, str(var_value))
except (KeyError, TypeError):
raise RuntimeError(
f"Error evaluating {stack}: '{org_value}': '{var_name}' not found in context (or is None)."
)
if not found_one or not isinstance(value, str):
break
parent[parent_key] = value
return value
res = repl(value, context, None, None)
return res
# register_plugins()
[docs]
class ConfigManager:
"""
Define and validate a run-configuration.
Also reads and compiles YAML files.
"""
#: Currently supportted syntax version.
#: (Incremented when incompatible changes are introduced.)
FILE_VERSION = 0
def __init__(self, stats_manager):
#: (dict) The complete parsed YAML file as dict
self.stats_manager = stats_manager
#: (dict) The complete parsed YAML file as dict
self.config_all = None
#: (int) The file version, parsed from the `filer_version#VERSION` tag
self.file_version = None
#: (str) Absolute path of the YAML file
self.path = None
#: (str) Absolute root folder of the YAML file
self.root_folder = None
#: (str) Shortcut to self.run_config["name"] (defaults to filename without extension)
self.name = None
#: (:class:`stressor.util.PathStack`) Current compile location
self.stack = None
#: (dict) lists of compile errors and warnings
self.results = {
"error": [],
"warning": [],
}
# if path is not None:
# self.read(path)
return
[docs]
def get(self, key_path, default=NO_DEFAULT):
res = get_dict_attr(self.config_all, key_path, default)
return res
[docs]
def resolve_path(self, path, must_exist=True, check_root=True, default_ext=None):
"""Return an absolute path, assuming relative to the original config file."""
# TODO: check for invalid access (security risk!)
assert_always(self.path, "`read()` must be called before")
if not path.startswith("/"):
path = os.path.join(self.root_folder, path)
path = os.path.abspath(path)
if check_root and not path.startswith(self.root_folder):
raise ValueError(f"Path must be in or below {self.root_folder}: {path}")
if must_exist and not os.path.isfile(path):
raise ValueError(f"File not found: {path}")
return path
[docs]
def report_error(self, msg, level="error", exc=None, stack=None):
"""Called by activity and macro constructors to signal errors or warnings.
The compiler also calls this when a constructor raises an exception.
"""
check_arg(level, str, level in ("error", "warning"))
path = stack if stack else str(self.stack)
hint = f"{path}: {msg}"
if exc:
logger.exception(hint)
# No need to log, since self.results are also summarized later
# elif level == "warning":
# logger.warning(hint)
# else:
# logger.error(hint)
self.results[level].append({"msg": msg, "path": path})
@property
def config(self):
"""Shortcut to config_all["run_config"]."""
return self.config_all["config"]
@property
def context(self):
"""Shortcut to config_all["context"]."""
return self.config_all["context"]
@property
def scenario(self):
"""Shortcut to config_all["scenario"]."""
return self.config_all["scenario"]
@property
def sessions(self):
"""Shortcut to config_all["sessions"]."""
return self.config_all["sessions"]
@property
def sequences(self):
"""Shortcut to config_all["sequences"]."""
return self.config_all["sequences"]
[docs]
def has_errors(self, or_warnings=False):
return bool(self.results["error"] or (or_warnings and self.results["warning"]))
[docs]
def update_config(self, extra_config, context_only=False):
"""Override self.config (and self.context) with new items.
self.config was already copied to self.context, so normally we want to
update both.
Args:
extra_config (dict): new values
context_only (bool): pass true to only set the shadow-copy (i.e. context)
"""
check_arg(extra_config, dict, or_none=True)
if not extra_config:
return
config = self.config
context = self.context
for k, v in extra_config.items():
if not context_only:
logger.info(f"Set config.{k}: {config.get(k)!r} -> {v!r}")
config[k] = v
else:
logger.info(f"Set context.{k}: {context.get(k)!r} -> {v!r}")
context[k] = v
return
[docs]
def validate_config(self, cfg=None):
"""
Raises:
ConfigurationError
Returns:
(int) Current file format version as defined in `file_version: stressor#N`
"""
if cfg is None:
cfg = self.config_all
def _check_type(key, types):
try:
o = get_dict_attr(cfg, key)
except Exception:
self.report_error("Could not find expected entry", stack=key)
return False
if o is None and None in types:
return True
elif not isinstance(o, types):
self.report_error(
f"Expected type {types}, but found {type(o)!r}", stack=key
)
return False
return True
sections = set(cfg.keys())
known_sections = {
"file_version",
"config",
"context",
"sessions",
"scenario",
"sequences",
}
file_version = cfg.get("file_version", "")
if not file_version.startswith("stressor#"):
raise ConfigurationError(
"Not a `stressor` file (missing 'stressor#VERSION' tag)."
)
file_version = int(file_version.split("#", 1)[1])
if file_version != self.FILE_VERSION:
raise ConfigurationError(
f"File version mismatch: expected {self.FILE_VERSION}, but found {file_version}."
)
missing = known_sections.difference(sections)
extra = sections.difference(known_sections)
if extra or missing:
raise ConfigurationError(
"Configuration file check failed:\n missing sections: {}\n invalid sections: {}".format(
", ".join(missing or "-"), ", ".join(extra or "-")
)
)
if _check_type("config", dict):
base_url = get_dict_attr(cfg, "config.base_url", None)
if base_url and (not base_url.startswith("http") or "://" not in base_url):
self.report_error(
f"config.base_url must be an absolute URL ('http(s)://...'): {base_url!r}",
)
if _check_type("context", (dict, None)):
pass
if _check_type("sessions", dict):
pass
# - sequences must be a dict of dicts.
# All entries must contain 'activity'
# - activity.assert_json must be a dict
sequence_names = set()
if _check_type("sequences", dict):
for seq_name, act_list in cfg["sequences"].items():
sequence_names.add(seq_name)
stack = f"sequences/{seq_name}"
if act_list is None:
act_list = []
cfg["sequences"][seq_name] = act_list
self.report_error(
"Ignored undefined list of activities",
level="warning",
stack=stack,
)
if not isinstance(act_list, list):
self.report_error(
"Expected list of activities",
stack=stack,
)
else:
for idx, act_def in enumerate(act_list):
stack = f"sequences/{seq_name}#{idx:02}"
if not isinstance(act_def, dict):
self.report_error(
"Expected dict with `activity` key",
stack=stack,
)
else:
activity = act_def.get("activity")
if not isinstance(activity, ActivityBase):
self.report_error(
"`activity` must be an instance of ActivityBase "
f"instance (found {activity!r})",
stack=stack,
)
assert_match = act_def.get("assert_match")
if assert_match is not None:
try:
re.compile(assert_match)
except re.error as e:
self.report_error(
f"Invalid regular expression: {assert_match}: {e}",
stack=stack,
)
# Scenario list must contain 'sequence' keys and all sequences must exist
if _check_type("scenario", list):
for idx, seq_def in enumerate(cfg["scenario"]):
stack = f".scenario#{idx:02}"
if not isinstance(seq_def, dict) or "sequence" not in seq_def:
self.report_error(
"Expected dict with `sequence` key",
stack=stack,
)
elif seq_def["sequence"] not in sequence_names:
self.report_error(
"sequence name is not defined in `sequences`",
stack=stack,
)
# TODO:
# - if init is given, it must be first?
# - if end is given, it must be last?
# - assert_json, assert_match, ...
return file_version
[docs]
def _compile(self, value, parent=None, parent_key=None, stack=None):
"""Apply load-time conversions after a config file was read.
- Replace activity definitions with instances of :class:`ActvityBase`
- Resolve load-time macros (partly by replacing them with activites)
**Note:** Some makros, especially `$(CONTEXT.VAR)` are *not* resolved here,
because this needs to be done at run-time.
"""
pm = PluginManager
assert pm.activity_plugin_map
stats = self.stats_manager
if stack is None:
# Top-Level call; `value` is the YAML cnfig dict
self.stack = PathStack("config")
stack = self.stack
# Register sequence names
for seq_name in value.get("sequences", {}).keys():
# Create an initial statistics dict for sequence_stats[SEQ_NAME]:
stats.register_sequence(seq_name)
if parent_key == "activity":
path_info = parent.get(parent_key)
else:
path_info = parent_key
with stack.enter(path_info, ignore=parent is None):
# logger.debug("compile {}".format(stack))
# Reslove `$name()` macros, which may replace themselves, e.g.
# - "GetRequest" -> `GetRequestActivity()`
# - "$load()" -> list or dict that needs to be compiled as well
if isinstance(value, str) and "$" in value:
has_match = False
for macro_cls in pm.macro_plugin_map.values():
try:
macro = macro_cls()
handled, res = macro.match_apply(self, parent, parent_key)
if handled:
has_match = True
logger.debug(f"Eval {stack}: {value} => {res}")
# Re-init `value` in case the macro replaced it
value = parent[parent_key]
break
except Exception as e:
msg = f"Could not evaluate macro {value!r}"
self.report_error(msg, exc=e)
# raise ConfigurationError(
# "Could not evaluate {!r} at {}: {}".format(value, stack, e)
# ) from e
if not has_match and GENERIC_MACRO_REX.match(value):
msg = f"Entry looks like a macro, but has no handler: '{value}'"
self.report_error(msg, level="warning")
# Resolve lists and dicts recursively:
if isinstance(value, dict):
# Macros may change the dictionary size, so iterate over a copy
for key, sub_val in tuple(value.items()):
self._compile(sub_val, value, key, stack)
return
elif isinstance(value, (list, tuple)):
# Macros may change the list size, so iterate over a copy
for idx, elem in enumerate(tuple(value)):
self._compile(elem, value, idx, stack)
return
# Either 'activity' was already an activity name, or a preceeding macro
# set it:
if isinstance(value, str) and value in pm.activity_plugin_map:
# Replace the activity definition with an instance of the class.
# Allow activities to do compile-time checking ad processing
activity_cls = pm.activity_plugin_map[value]
try:
# print(parent)
activity_inst = activity_cls(self, **parent)
parent[parent_key] = activity_inst
if stats:
stats.register_activity(activity_inst)
except ActivityCompileError as e:
# Don't pass exc to supress stack trace
self.report_error(f"{e}")
except Exception as e:
msg = f"Could not evaluate activity {value!r}"
self.report_error(msg, exc=e)
# logger.error("{} {}: {}".format(stack, value, e))
return
[docs]
def read(self, path, load_files=True):
"""Read a YAML file into ``self.config_all``.
Raises:
ConfigurationError
"""
check_arg(load_files, bool)
self.config_all = None
if not path.lower().endswith(".yaml"):
path += ".yaml"
path = os.path.abspath(path)
if not os.path.isfile(path):
raise ConfigurationError(f"File not found: {path}")
self.path = path
self.root_folder = os.path.dirname(path)
self.name = os.path.splitext(os.path.basename(path))[0]
with open(path) as f:
try:
res = yaml.safe_load(f)
except yaml.parser.ParserError as e:
raise ConfigurationError(f"Could not parse YAML: {e}") from None
if not isinstance(res, dict) or not res.get("file_version", "").startswith(
"stressor#"
):
raise ConfigurationError(
"Not a `stressor` file (missing 'stressor#VERSION' tag)."
)
self._compile(res)
self.file_version = self.validate_config(res)
if self.results["warning"]:
logger.error("Compiler warnings:")
for m in self.results["warning"]:
logger.warning(" - {}: {}".format(m["path"], m["msg"]))
if self.results["error"]:
logger.error("Compiler errors:")
for m in self.results["error"]:
logger.error(" - {}: {}".format(m["path"], m["msg"]))
raise ConfigurationError("Config file had compile errors.")
self.config_all = res
# Copy values from `config.*` to `context.*`
if self.config_all.get("context") is None:
self.config_all["context"] = {}
for k, v in self.config.items():
self.context.setdefault(k, v)
return self.config_all