# (c) 2020-2024 Martin Wendt and contributors; see https://github.com/mar10/stressor
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
"""
import os
import random
from textwrap import dedent
import yaml
from stressor.plugins.base import ActivityBase, MacroBase
from stressor.util import check_arg, format_elap
[docs]
class LoadMacro(MacroBase):
"""Implement `$load(path)` macro."""
[docs]
def apply(self, config_manager, parent, parent_key, path):
path = config_manager.resolve_path(path, must_exist=True)
if path.lower().endswith(".py"):
assert parent_key == "script"
with open(path) as f:
res = f.read()
parent[parent_key] = res
return res
if not path.lower().endswith((".yaml", ".yml")):
raise NotImplementedError
# Load (and )
with open(path) as f:
res = yaml.safe_load(f)
assert isinstance(parent, dict)
assert isinstance(res, list)
parent[parent_key] = res
# print(yaml.dump(parent, default_flow_style=False))
return res
[docs]
class EnvMacro(MacroBase):
"""Implement `$env(var_name)` macro, which resolves an environment variable at load-time."""
[docs]
def apply(self, config_manager, parent, parent_key, var_name):
# TODO:
# Allow optional 'default' args: `$env(HOME, Foo)`
# Maybe introspection can automate this?
# var_name, default = parse_arglist(var_name)
res = os.environ[var_name]
parent[parent_key] = res
[docs]
class DebugMacro(MacroBase):
"""Implement `$debug()` macro, which dumps information at run-time."""
[docs]
def apply(self, config_manager, parent, parent_key, var_name):
parent[parent_key] = "RunScript"
parent["name"] = "$debug"
parent["export"] = []
parent["script"] = dedent(
r"""
from pprint import pprint
pprint(locals())
"""
)
# class CrontabMacro(MacroBase):
# """Implement `$cron()` macro, which inserts the current UTC time stamp.
# https://github.com/josiahcarlson/parse-crontab
# """
# def apply(self, config_manager, parent, parent_key, var_name):
# raise NotImplementedError
# class StampMacro(MacroBase):
# """Implement `$stamp()` macro, which inserts the current UTC time stamp."""
# run_time_eval = True
# def apply(self, config_manager, parent, parent_key, var_name):
# raise NotImplementedError
# class StopMacro(MacroBase):
# """Implement `$stop()` macro, which dumps information at run-time and ends execution."""
# def apply(self, config_manager, parent, parent_key, var_name):
# raise NotImplementedError
# class BreakMacro(MacroBase):
# """Implement `$break()` macro, which halts and enters single-step mode."""
# def apply(self, config_manager, parent, parent_key, var_name):
# # dump info
# # enter single-step mode
# # allow to stop the run
# raise NotImplementedError
[docs]
class SleepMacro(MacroBase):
"""Implement `$sleep(duration)` macro, which is a shortcut to :class`SleepActivity`."""
_args_def = (
("min", float), # mandatory
("max", float, None), # optional
)
[docs]
def apply(self, config_manager, parent, parent_key, min, max):
# duration is always a string, it may even be $(context.var) macro.
# We still do some checking here, to get early load-time errors.
# check_arg(min, str)
assert parent_key == "activity"
# if "$" not in min:
# assert_always(float(min) >= 0)
parent[parent_key] = "Sleep"
parent["duration"] = min
if max is not None:
parent["duration_2"] = max
return True
[docs]
class SleepActivity(ActivityBase):
"""
Args:
duration (float): sleep duration in seconds
Examples::
sequences:
main:
- activity: Sleep
duration: 0.5
"""
_mandatory_args = {"duration"}
_known_args = {"duration", "duration_2"}
_info_args = ("name", "duration", "duration_2")
#: Sleep activities are ignorable by default
_default_ignore_timing = True
def __init__(self, config_manager, **activity_args):
check_arg(activity_args.get("duration"), (str, int, float))
check_arg(activity_args.get("duration_2"), (str, int, float), or_none=True)
super().__init__(config_manager, **activity_args)
# TODO: Support CronTab syntax
# https://github.com/taichino/croniter
# https://github.com/josiahcarlson/parse-crontab
# def __str__(self):
# return "Sleep({:.3} sec.)".format(self.duration)
[docs]
def get_info(self, info_args=True, expanded_args=None, session=None):
# We want to display the actual sleep duration even if it was calculated
# from a range. So we store it in `prepare_execute` and use it here
# if available.
if (
expanded_args and session and "duration_2" in expanded_args
): # Set by prepare_execute()
return "{}(duration[{}..{}] => {})".format(
self.get_script_name(),
float(expanded_args["duration"]),
float(expanded_args["duration_2"]),
format_elap(session.data["_cur_duration"]),
)
return super().get_info(info_args, expanded_args, session)
[docs]
def prepare_execute(self, session, expanded_args):
duration = float(expanded_args["duration"])
duration_2 = expanded_args.get("duration_2")
if duration_2 is not None:
duration = random.uniform(duration, float(duration_2))
session.data["_cur_duration"] = duration
[docs]
def execute(self, session, **expanded_args):
assert "_cur_duration" in session.data
duration = session.data["_cur_duration"]
session.data["_cur_duration"] = None
if not session.dry_run:
session.stop_request.wait(timeout=duration)
return
# class MemoryActivity(ActivityBase):
# """
# Args:
# operation (str): 'alloc' | 'free'
# size (int):
# """
# def __init__(self, config_manager, operation, **activity_args):
# super().__init__(config_manager, **activity_args)
# def execute(self, session):
# raise NotImplementedError
# class CpuActivity(ActivityBase):
# """
# Args:
# operation (str): 'load' | 'stop'
# cores (int): Default: 1
# async (bool): Default: True
# duration (float):
# """
# def __init__(self, config_manager, operation, **activity_args):
# super().__init__(config_manager, **activity_args)
# def execute(self, session):
# raise NotImplementedError
# class FileActivity(ActivityBase):
# """
# Args:
# operation (str): 'read' | 'write'
# size (int):
# path (str):
# """
# def __init__(self, config_manager, operation, **activity_args):
# super().__init__(config_manager, **activity_args)
# def execute(self, session):
# raise NotImplementedError