"""
# Actions
Actions are enhanced callables that are used by `Experiments` to define
the order of operations a project is put through when the experiment
executes.
## Example
TODO
```python
```
"""
import abc
import enum
import functools as ft
import logging
import os
import sys
import textwrap
import traceback
from datetime import datetime
import attr
import sqlalchemy as sa
from plumbum import ProcessExecutionError
from benchbuild import signals
from benchbuild.settings import CFG
from benchbuild.utils import container, db
from benchbuild.utils.cmd import mkdir, rm, rmdir
LOG = logging.getLogger(__name__)
[docs]@enum.unique
class StepResult(enum.IntEnum):
"""Result type for action results."""
UNSET = 0
OK = 1
CAN_CONTINUE = 2
ERROR = 3
[docs]def step_has_failed(step_results, error_status=None):
if not error_status:
error_status = [StepResult.ERROR, StepResult.CAN_CONTINUE]
return len(list(filter(lambda res: res in error_status, step_results))) > 0
[docs]def num_steps(steps):
return sum([len(step) for step in steps])
[docs]def print_steps(steps):
print("Number of actions to execute: {}".format(num_steps(steps)))
print(*steps)
[docs]def to_step_result(func):
"""Convert a function return to a list of StepResults.
All Step subclasses automatically wrap the result of their
__call__ method's result with this wrapper.
If the result is not a list of StepResult values, one will
be generated.
result of `[StepResult.OK]`, or convert the given result into
a list.
Args:
func: The function to wrap.
"""
@ft.wraps(func)
def wrapper(*args, **kwargs):
"""Wrapper stub."""
res = func(*args, **kwargs)
if not res:
res = [StepResult.OK]
if not hasattr(res, "__iter__"):
res = [res]
return res
return wrapper
[docs]def prepend_status(func):
"""Prepends the output of `func` with the status."""
@ft.wraps(func)
def wrapper(self, *args, **kwargs):
"""Wrapper stub."""
res = func(self, *args, **kwargs)
if self.status is not StepResult.UNSET:
res = "[{status}]".format(status=self.status.name) + res
return res
return wrapper
[docs]def notify_step_begin_end(func):
"""Print the beginning and the end of a `func`."""
@ft.wraps(func)
def wrapper(self, *args, **kwargs):
"""Wrapper stub."""
cls = self.__class__
on_step_begin = cls.ON_STEP_BEGIN
on_step_end = cls.ON_STEP_END
for begin_listener in on_step_begin:
begin_listener(self)
res = func(self, *args, **kwargs)
for end_listener in on_step_end:
end_listener(self, func)
return res
return wrapper
[docs]def log_before_after(name: str, desc: str):
"""Log customized stirng before & after running func."""
def func_decorator(f):
"""Wrapper stub."""
@ft.wraps(f)
def wrapper(*args, **kwargs):
"""Wrapper stub."""
LOG.info("\n%s - %s", name, desc)
res = f(*args, **kwargs)
if StepResult.ERROR not in res:
LOG.info("%s - OK\n", name)
else:
LOG.error("%s - ERROR\n", name)
return res
return wrapper
return func_decorator
[docs]class StepClass(abc.ABCMeta):
"""Decorate `steps` with logging and result conversion."""
def __new__(mcs, name, bases, namespace, **_):
result = abc.ABCMeta.__new__(mcs, name, bases, dict(namespace))
NAME = result.NAME
DESCRIPTION = result.DESCRIPTION
if NAME and DESCRIPTION:
result.__call__ = log_before_after(NAME, DESCRIPTION)(
to_step_result(result.__call__))
else:
result.__call__ = to_step_result(result.__call__)
result.__str__ = prepend_status(result.__str__)
return result
[docs]@attr.s(cmp=False)
class Step(metaclass=StepClass):
"""Base class of a step.
This stores all common attributes for step classes.
metaclass ([type], optional): Defaults to StepClass. Takes
care of wrapping Steps correctly.
Raises:
StopIteration: If we do not encapsulate more substeps.
"""
NAME = None
DESCRIPTION = None
ON_STEP_BEGIN = []
ON_STEP_END = []
obj = attr.ib(default=None, repr=False)
action_fn = attr.ib(default=None, repr=False)
status = attr.ib(default=StepResult.UNSET)
def __len__(self):
return 1
def __iter__(self):
return self
def __next__(self):
raise StopIteration
@notify_step_begin_end
def __call__(self):
if not self.action_fn:
return StepResult.ERROR
self.action_fn()
self.status = StepResult.OK
return StepResult.OK
def __str__(self, indent=0):
return textwrap.indent(
"* {name}: Execute configured action.".format(name=self.obj.name),
indent * " ")
[docs] def onerror(self):
Clean(self.obj)()
[docs]@attr.s
class Clean(Step):
NAME = "CLEAN"
DESCRIPTION = "Cleans the build directory"
check_empty = attr.ib(default=False)
[docs] @staticmethod
def clean_mountpoints(root: str):
"""
Unmount any remaining mountpoints under :root.
Args:
root: All UnionFS-mountpoints under this directory will be
unmounted.
"""
import psutil
umount_paths = []
real_root = os.path.realpath(root)
for part in psutil.disk_partitions(all=True):
if os.path.commonpath([part.mountpoint, real_root]) == real_root:
if not part.fstype == "fuse.unionfs":
LOG.error("NON-UnionFS mountpoint found under %s", root)
else:
umount_paths.append(part.mountpoint)
@notify_step_begin_end
def __call__(self):
if not CFG['clean']:
LOG.warning("Clean disabled by config.")
return
if not self.obj:
LOG.warning("No object assigned to this action.")
return
obj_builddir = os.path.abspath(self.obj.builddir)
if os.path.exists(obj_builddir):
LOG.debug("Path %s exists", obj_builddir)
Clean.clean_mountpoints(obj_builddir)
if self.check_empty:
rmdir(obj_builddir, retcode=None)
else:
rm("-rf", obj_builddir)
else:
LOG.debug("Path %s did not exist anymore", obj_builddir)
self.status = StepResult.OK
def __str__(self, indent=0):
return textwrap.indent(
"* {0}: Clean the directory: {1}".format(
self.obj.name, self.obj.builddir), indent * " ")
[docs]class MakeBuildDir(Step):
NAME = "MKDIR"
DESCRIPTION = "Create the build directory"
@notify_step_begin_end
def __call__(self):
if not self.obj:
return
if not os.path.exists(self.obj.builddir):
mkdir("-p", self.obj.builddir)
self.status = StepResult.OK
def __str__(self, indent=0):
return textwrap.indent(
"* {0}: Create the build directory".format(self.obj.name),
indent * " ")
[docs]class Compile(Step):
NAME = "COMPILE"
DESCRIPTION = "Compile the project"
def __init__(self, project):
super(Compile, self).__init__(obj=project, action_fn=project.compile)
def __str__(self, indent=0):
return textwrap.indent("* {0}: Compile".format(self.obj.name),
indent * " ")
[docs]class Run(Step):
NAME = "RUN"
DESCRIPTION = "Execute the run action"
def __init__(self, project):
super(Run, self).__init__(obj=project, action_fn=project.run)
@notify_step_begin_end
def __call__(self):
if not self.obj:
return
if not self.action_fn:
return
self.action_fn()
self.status = StepResult.OK
def __str__(self, indent=0):
return textwrap.indent(
"* {0}: Execute run-time tests.".format(self.obj.name),
indent * " ")
[docs]@attr.s
class Echo(Step):
NAME = 'ECHO'
DESCRIPTION = 'Print a message.'
message = attr.ib(default="")
def __str__(self, indent=0):
return textwrap.indent("* echo: {0}".format(self.message),
indent * " ")
@notify_step_begin_end
def __call__(self):
LOG.info(self.message)
[docs]@attr.s(cmp=False)
class Any(Step):
NAME = "ANY"
DESCRIPTION = "Just run all actions, no questions asked."
actions = attr.ib(default=attr.Factory(list), repr=False, cmp=False)
def __len__(self):
return sum([len(x) for x in self.actions]) + 1
def __iter__(self):
return self.actions.__iter__()
@notify_step_begin_end
def __call__(self):
length = len(self.actions)
cnt = 0
results = [StepResult.OK]
for a in self.actions:
cnt = cnt + 1
result = a()
results.append(result)
if StepResult.ERROR in result:
LOG.warning("%d actions left in queue", length - cnt)
self.status = StepResult.OK
if StepResult.ERROR in results:
self.status = StepResult.CAN_CONTINUE
def __str__(self, indent=0):
sub_actns = [a.__str__(indent + 1) for a in self.actions]
sub_actns = "\n".join(sub_actns)
return textwrap.indent("* Execute all of:\n" + sub_actns, indent * " ")
[docs]@attr.s(cmp=False, hash=True)
class Experiment(Any):
NAME = "EXPERIMENT"
DESCRIPTION = "Run a experiment, wrapped in a db transaction"
def __attrs_post_init__(self):
self.actions = \
[Echo(message="Start experiment: {0}".format(self.obj.name))] + \
self.actions + \
[Echo(message="Completed experiment: {0}".format(self.obj.name))]
[docs] def begin_transaction(self):
experiment, session = db.persist_experiment(self.obj)
if experiment.begin is None:
experiment.begin = datetime.now()
else:
experiment.begin = min(experiment.begin, datetime.now())
session.add(experiment)
try:
session.commit()
except sa.orm.exc.StaleDataError:
LOG.error("Transaction isolation level caused a StaleDataError")
# React to external signals
signals.handlers.register(Experiment.end_transaction, experiment,
session)
return experiment, session
[docs] @staticmethod
def end_transaction(experiment, session):
try:
if experiment.end is None:
experiment.end = datetime.now()
else:
experiment.end = max(experiment.end, datetime.now())
session.add(experiment)
session.commit()
except sa.exc.InvalidRequestError as inv_req:
LOG.error(inv_req)
@notify_step_begin_end
def __call__(self):
results = []
session = None
experiment, session = self.begin_transaction()
try:
for a in self.actions:
try:
result = a()
results.extend(result)
except KeyboardInterrupt:
LOG.info("Experiment aborting by user request")
results.append(StepResult.ERROR)
break
except Exception:
LOG.error("Experiment terminates "
"because we got an exception:")
e_type, e_value, e_traceb = sys.exc_info()
lines = traceback.format_exception(e_type, e_value,
e_traceb)
results.append(StepResult.ERROR)
LOG.error("".join(lines))
break
finally:
self.end_transaction(experiment, session)
signals.handlers.deregister(self.end_transaction)
self.status = max(results)
return results
def __str__(self, indent=0):
sub_actns = [a.__str__(indent + 1) for a in self.actions]
sub_actns = "\n".join(sub_actns)
return textwrap.indent(
"\nExperiment: {0}\n".format(self.obj.name) + sub_actns,
indent * " ")
[docs]@attr.s
class RequireAll(Step):
actions = attr.ib(default=attr.Factory(list))
def __len__(self):
return sum([len(x) for x in self.actions]) + 1
def __iter__(self):
return self.actions.__iter__()
@notify_step_begin_end
def __call__(self):
results = []
for i, action in enumerate(self.actions):
try:
results.extend(action())
except ProcessExecutionError as proc_ex:
LOG.error("\n==== ERROR ====")
LOG.error("Execution of a binary failed in step: %s",
str(action))
LOG.error(str(proc_ex))
LOG.error("==== ERROR ====\n")
results.append(StepResult.ERROR)
except KeyboardInterrupt:
LOG.info("User requested termination.")
action.onerror()
results.append(StepResult.ERROR)
raise
except OSError:
LOG.error(
"Exception in step #%d: %s",
i,
str(action),
exc_info=sys.exc_info())
results.append(StepResult.ERROR)
if StepResult.ERROR in results:
LOG.error("Execution of #%d: '%s' failed.", i, str(action))
LOG.error("'%s' cannot continue.", str(self))
action.status = StepResult.ERROR
action.onerror()
self.status = StepResult.ERROR
return results
self.status = StepResult.OK
return results
def __str__(self, indent=0):
sub_actns = [a.__str__(indent + 1) for a in self.actions]
sub_actns = "\n".join(sub_actns)
return textwrap.indent("* All required:\n" + sub_actns, indent * " ")
[docs]@attr.s
class Containerize(RequireAll):
NAME = "CONTAINERIZE"
DESCRITPION = "Redirect into container"
[docs] def requires_redirect(self):
project = self.obj
return not container.in_container() and (project.container is not None)
@notify_step_begin_end
def __call__(self):
project = self.obj
if self.requires_redirect():
project.redirect()
self.status = StepResult.OK
else:
super(Containerize, self).__call__()
def __str__(self, indent=0):
sub_actns = [a.__str__(indent + 1) for a in self.actions]
sub_actns = "\n".join(sub_actns)
if container.in_container():
return textwrap.indent("* Running inside container:\n" + sub_actns,
indent * " ")
if self.requires_redirect():
return textwrap.indent(
"* Continue inside container:\n" + sub_actns, indent * " ")
return textwrap.indent("* Running without container:\n" + sub_actns,
indent * " ")