# -*- coding: utf-8 -*-
# (c) 2020-2023 Martin Wendt and contributors; see https://github.com/mar10/stressor
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
"""
import logging
import threading
import time
from collections import OrderedDict
from pprint import pformat
from stressor.util import format_elap, format_rate, get_dict_attr, shorten_string
logger = logging.getLogger("stressor")
[docs]
class StatisticManager:
"""
Example::
{'act_count': 4485,
'act_time': 404.5604224205017,
'act_time_avg': 0.09020299273589781,
'act_time_max': 0.3416469097137451,
'act_time_min': 0.00455021858215332,
'errors': 0,
'monitored': {'/config/sequences/main/2/activity': {'act_count': 889,
'act_time': 37.441248655319214,
'act_time_avg': 0.04211614021970665,
'act_time_max': 0.1545419692993164,
'act_time_min': 0.006118059158325195}},
'net_act_count': 3586,
'net_act_time': 132.06326842308044,
'net_act_time_avg': 0.03682745912523158,
'net_act_time_max': 0.1629331111907959,
'net_act_time_min': 0.00455021858215332,
'seq_count': 909,
'seq_time': 405.6781575679779,
'seq_time_avg': 0.4462906023850142,
'seq_time_max': 0.6640150547027588,
'seq_time_min': 0.04558515548706055,
'sequence_stats': {'end': {'act_count': 20,
'act_time': 1.2047512531280518,
'act_time_avg': 0.060237562656402587,
'act_time_max': 0.10723018646240234,
'act_time_min': 0.0059051513671875,
'errors': 0,
'net_act_count': 10,
'net_act_time': 0.17910146713256836,
'net_act_time_avg': 0.017910146713256837,
'net_act_time_max': 0.042352914810180664,
'net_act_time_min': 0.0059051513671875,
'seq_count': 10,
'seq_time': 1.2114121913909912,
'seq_time_avg': 0.12114121913909912,
'seq_time_max': 0.14590692520141602,
'seq_time_min': 0.1082160472869873,
'warnings': 0},
'init': ...
'sessions': {'t1': {'act_count': 454,
'act_time': 40.14628767967224,
'act_time_avg': 0.08842794643099612,
'act_time_max': 0.3342282772064209,
'act_time_min': 0.00455021858215332,
'errors': 0,
'net_act_count': 363,
'net_act_time': 12.54139757156372,
'net_act_time_avg': 0.034549304604858735,
'net_act_time_max': 0.11727690696716309,
'net_act_time_min': 0.00455021858215332,
'path': '/h1/p1/t1',
'seq_count': 92,
'seq_time': 40.26065945625305,
'seq_time_avg': 0.4376158636549245,
'seq_time_max': 0.6443750858306885,
'seq_time_min': 0.05200076103210449,
'user': 'User_1',
'warnings': 0},
't2': {'act_count': 449,
...
'stage': None,
'warnings': 0}
"""
def __init__(self):
self._lock = threading.RLock()
self.stats = {
"act_count": 0,
"act_time": 0.0,
"net_act_count": 0,
"net_act_time": 0.0,
"seq_count": 0,
"sess_count": 0,
"sess_running": 0,
"errors": 0,
"warnings": 0,
"run_limit_reached": False,
"stage": None,
"sequence_stats": {},
"sessions": {},
"monitored": {},
}
self.sequence_names = OrderedDict()
self.monitored_activities = OrderedDict()
def __getitem__(self, key):
return get_dict_attr(self.stats, key)
[docs]
def register_sequence(self, name):
"""Called by compiler."""
assert name not in self.sequence_names
self.sequence_names[name] = True
res = {
"name": name,
"errors": 0,
"warnings": 0,
}
self.stats["sequence_stats"][name] = res
return res
[docs]
def register_activity(self, activity):
"""Called by compiler."""
name = activity.compile_path
assert name not in self.monitored_activities
if activity.raw_args.get("monitor"):
self.monitored_activities[name] = True
self.stats["monitored"][name] = {}
return
[docs]
def register_session(self, session):
"""Called by run_manager."""
d = {
"errors": 0,
"warnings": 0,
"user": session.user.name,
"path": str(session.context_stack),
"active": False,
}
self.stats["sessions"][session.session_id] = d
self.stats["sess_count"] += 1
[docs]
def _report(self, mode, session, sequence, activity, path=None, error=None):
assert mode in ("start", "end", "error")
assert mode == "error" or error is None
# print("*** _report", mode, session, sequence, activity, error)
global_stats = self.stats
sess_stats = global_stats["sessions"][session.session_id] if session else None
seq_stats = global_stats["sequence_stats"][sequence] if sequence else None
elap = 0
with self._lock:
now = time.time()
if activity:
assert session and sequence
key = activity.compile_path
if mode == "start":
assert session.pending_activity is None
session.pending_activity = activity
session.activity_start = now
sess_stats["path"] = path or activity.compile_path
else:
# 'end' or 'error'
assert session.pending_activity is activity
elap = now - session.activity_start
session.pending_activity = None
session.activity_start = 0
# We add timings even if activity errored
is_net = not activity.ignore_timing
self._add_timing(global_stats, "act_", elap, is_net=is_net)
self._add_timing(sess_stats, "act_", elap, is_net=is_net)
self._add_timing(seq_stats, "act_", elap, is_net=is_net)
if activity.monitor:
d = global_stats["monitored"][key]
self._add_timing(d, "act_", elap, is_net=False)
if mode == "end":
pass
else: # 'error'
self._add_error(global_stats, error)
self._add_error(sess_stats, error)
self._add_error(seq_stats, error)
if activity.monitor:
d = global_stats["monitored"][key]
self._add_error(d, error)
elif sequence:
assert session
if mode == "start":
assert session.pending_sequence is None
session.pending_sequence = sequence
session.sequence_start = now
sess_stats["path"] = sequence
else:
# 'end' or 'error'
assert session.pending_sequence == sequence
elap = now - session.sequence_start
session.pending_sequence = None
session.sequence_start = 0
self._add_timing(global_stats, "seq_", elap, is_net=False)
self._add_timing(seq_stats, "seq_", elap, is_net=False)
self._add_timing(sess_stats, "seq_", elap, is_net=False)
if mode == "end":
sess_stats["path"] = None
else: # 'error'
self._add_error(seq_stats, error)
elif session:
if mode == "start":
global_stats["sess_running"] += 1
else:
global_stats["sess_running"] -= 1
else:
if mode == "error":
global_stats["errors"] += 1
return
[docs]
def report_start(self, session, sequence, activity, path=None):
self._report("start", session, sequence, activity, path=path)
[docs]
def report_end(self, session, sequence, activity):
self._report("end", session, sequence, activity)
[docs]
def report_error(self, session, sequence, activity, error):
self._report("error", session, sequence, activity, error=error)
[docs]
def report_limit_violation(self, msg):
"""Register 'limit reached' error (not more than once)."""
if not self.stats["run_limit_reached"]:
self.stats["run_limit_reached"] = True
self.stats["errors"] += 1
self.stats["last_error"] = msg
[docs]
def _add_timing(self, d, key_prefix, elap, is_net=None):
p = key_prefix
count = d.setdefault(p + "count", 0) + 1
time_tot = d.setdefault(p + "time", 0.0) + elap
time_max = d.setdefault(p + "time_max", 0.0)
time_min = d.setdefault(p + "time_min", 0.0)
d[p + "count"] = count
d[p + "time"] = time_tot
if elap > time_max:
d[p + "time_max"] = elap
if time_min == 0.0 or elap < time_min:
d[p + "time_min"] = elap
d[p + "time_avg"] = time_tot / count
if is_net:
p = "net_" + key_prefix
self._add_timing(d, p, elap)
return
[docs]
def _add_error(self, d, error):
d.setdefault("errors", 0)
d["errors"] += 1
d["last_error"] = shorten_string("{}".format(error), 500, 100)
[docs]
def error_count(self, or_warnings=False):
error_count = self.stats["errors"]
return error_count
[docs]
def has_errors(self, or_warnings=False):
return self.error_count(or_warnings) > 0
[docs]
def get_monitor_info(self, config_all):
stats = self.stats
def f(d, k, secs=False):
v = d.get(k, 0)
if secs:
v = format_elap(v)
return v
# --- Add rows for every sequence name:
# Cache config_all.scenario.<sequence> entries as a dict:
scenario_map = {}
for scenario_seq_def in config_all["scenario"]:
seq_name = scenario_seq_def["sequence"]
scenario_map[seq_name] = scenario_seq_def
seq_stats = []
def _add_seq(name, info):
title = name
# Add duration/repeat info to the display title
seq_def = scenario_map.get(name, {})
extra = []
if "repeat" in seq_def:
extra.append("n: {:,}".format(seq_def["repeat"]))
if "duration" in seq_def:
extra.append("Δt: {}".format(format_elap(seq_def["duration"])))
if extra:
title = "{} ({})".format(seq_name, ", ".join(extra))
seq_stats.append(
{
"cols": [
title,
f(info, "seq_count"),
f(info, "seq_time", True),
f(info, "seq_time_avg", True),
f(info, "seq_time_max", True),
f(info, "act_count"),
f(info, "errors"),
f(info, "net_act_count"),
f(info, "net_act_time", True),
f(info, "net_act_time_avg", True),
f(info, "net_act_time_max", True),
format_rate(
info.get("net_act_count"), info.get("net_act_time")
),
],
"type": "sequence",
"key": name if name != "Summary" else None,
}
)
for seq_name in self.sequence_names:
info = stats["sequence_stats"][seq_name]
_add_seq(seq_name, info)
_add_seq("Summary", stats)
# --- List of all activities that are marked `monitor: true`
activity_stats = []
for act_compile_path in self.monitored_activities:
info = stats["monitored"][act_compile_path]
activity_stats.append(
{
"cols": [
act_compile_path,
f(info, "act_count"),
f(info, "errors"),
f(info, "act_time", True),
f(info, "act_time_min", True),
f(info, "act_time_avg", True),
f(info, "act_time_max", True),
f(info, "last_error") or "n.a.",
],
"type": "monitored",
"key": act_compile_path,
}
)
# --- List all sessions
sessions = []
for idx, (session_id, info) in enumerate(stats["sessions"].items(), 1):
sessions.append(
{
"cols": [
idx,
session_id,
info["user"],
f(info, "seq_count"),
f(info, "act_count"),
f(info, "errors"),
info["path"],
],
"type": "session",
"key": session_id,
}
)
res = {
"hasErrors": self.has_errors(),
"seq_stats": seq_stats,
"act_stats": activity_stats,
"sess_stats": sessions,
"raw": self.stats,
}
return res
[docs]
def get_error_info(self, args):
type_ = args["type"]
key = args["key"]
if type_ == "sequence":
errors = self.stats["sequence_stats"][key]["last_error"]
elif type_ == "session":
errors = self.stats["sessions"][key]["last_error"]
elif type_ == "monitored":
errors = self.stats["monitored"][key]["last_error"]
return "Last Error Info ({}):\n\n{}".format(args, errors)