"""Experiment helpers."""
import logging
import sys
import typing as t
from contextlib import contextmanager
from plumbum import TEE, local
from plumbum.commands import ProcessExecutionError
import attr
from benchbuild import settings, signals
CFG = settings.CFG
LOG = logging.getLogger(__name__)
[docs]@attr.s(cmp=False)
class RunInfo:
"""
Execution context of wrapped binaries.
Execution of tracked binaries is guarded with this context
object. In here we store everything about a single binary
execution for consumption of an experiment.
Attributes:
cmd ():
failed ():
project ():
experiment ():
retcode ():
stdout ():
stderr ():
db_run ():
session ():
"""
def __begin(self, command, project, ename, group):
"""
Begin a run in the database log.
Args:
command: The command that will be executed.
pname: The project name we belong to.
ename: The experiment name we belong to.
group: The run group we belong to.
Returns:
(run, session), where run is the generated run instance and
session the associated transaction for later use.
"""
from benchbuild.utils.db import create_run
from benchbuild.utils import schema as s
from datetime import datetime
db_run, session = create_run(command, project, ename, group)
db_run.begin = datetime.now()
db_run.status = 'running'
log = s.RunLog()
log.run_id = db_run.id
log.begin = datetime.now()
log.config = repr(CFG)
session.add(log)
session.add(db_run)
self.db_run = db_run
self.session = session
def __end(self, stdout, stderr):
"""
End a run in the database log (Successfully).
This will persist the log information in the database and commit the
transaction.
Args:
db_run: The ``run`` schema object we belong to
session: The db transaction we belong to.
stdout: The stdout we captured of the run.
stderr: The stderr we capture of the run.
"""
from benchbuild.utils.schema import RunLog
from datetime import datetime
run_id = self.db_run.id
log = self.session.query(RunLog).filter(RunLog.run_id == run_id).one()
log.stderr = stderr
log.stdout = stdout
log.status = 0
log.end = datetime.now()
self.db_run.end = datetime.now()
self.db_run.status = 'completed'
self.session.add(log)
self.session.add(self.db_run)
def __fail(self, retcode, stdout, stderr):
"""
End a run in the database log (Unsuccessfully).
This will persist the log information in the database and commit the
transaction.
Args:
db_run: The ``run`` schema object we belong to
session: The db transaction we belong to.
retcode: The return code we captured of the run.
stdout: The stdout we captured of the run.
stderr: The stderr we capture of the run.
"""
from benchbuild.utils.schema import RunLog
from datetime import datetime
run_id = self.db_run.id
log = self.session.query(RunLog).filter(RunLog.run_id == run_id).one()
log.stderr = stderr
log.stdout = stdout
log.status = retcode
log.end = datetime.now()
self.db_run.end = datetime.now()
self.db_run.status = 'failed'
self.failed = True
self.session.add(log)
self.session.add(self.db_run)
cmd = attr.ib(default=None, repr=False)
failed = attr.ib(default=False)
project = attr.ib(default=None, repr=False)
experiment = attr.ib(default=None, repr=False)
retcode = attr.ib(default=0)
stdout = attr.ib(default=attr.Factory(list), repr=False)
stderr = attr.ib(default=attr.Factory(list), repr=False)
db_run = attr.ib(init=False, default=None)
session = attr.ib(init=False, default=None, repr=False)
payload = attr.ib(init=False, default=None, repr=False)
def __attrs_post_init__(self):
self.__begin(self.cmd, self.project, self.experiment.name,
self.project.run_uuid)
signals.handlers.register(self.__fail, 15, "SIGTERM", "SIGTERM")
run_id = self.db_run.id
settings.CFG["db"]["run_id"] = run_id
[docs] def add_payload(self, name, payload):
if self == payload:
return
if not self.payload:
self.payload = {name: payload}
else:
self.payload.update({name: payload})
@property
def has_failed(self):
"""Check, whether this run failed."""
return self.failed
def __call__(self, *args, expected_retcode=0, ri=None, **kwargs):
cmd_env = settings.CFG.to_env_dict()
with local.env(**cmd_env):
try:
bin_name = sys.argv[0]
retcode, stdout, stderr = \
self.cmd & TEE(retcode=expected_retcode)
f_stdout = bin_name + ".stdout"
f_stderr = bin_name + ".stderr"
with open(f_stdout, 'w') as fd_stdout:
fd_stdout.write(stdout)
with open(f_stderr, 'w') as fd_stderr:
fd_stderr.write(stderr)
self.retcode = retcode
self.stdout = stdout
self.stderr = stderr
self.__end(str(stdout), str(stderr))
except ProcessExecutionError as ex:
self.__fail(ex.retcode, ex.stderr, ex.stdout)
self.retcode = ex.retcode
self.stdout = ex.stdout
self.stderr = ex.stderr
LOG.debug("Tracked process failed")
LOG.error(str(ex))
except KeyboardInterrupt:
self.retcode = retcode
self.stdout = stdout
self.stderr = stderr
self.__fail(-1, "", "KeyboardInterrupt")
LOG.warning("Interrupted by user input")
raise
finally:
signals.handlers.deregister(self.__fail)
return self
[docs] def commit(self):
self.session.commit()
[docs]def begin_run_group(project):
"""
Begin a run_group in the database.
A run_group groups a set of runs for a given project. This models a series
of runs that form a complete binary runtime test.
Args:
project: The project we begin a new run_group for.
Returns:
``(group, session)`` where group is the created group in the
database and session is the database session this group lives in.
"""
from benchbuild.utils.db import create_run_group
from datetime import datetime
group, session = create_run_group(project)
group.begin = datetime.now()
group.status = 'running'
session.commit()
return group, session
[docs]def end_run_group(group, session):
"""
End the run_group successfully.
Args:
group: The run_group we want to complete.
session: The database transaction we will finish.
"""
from datetime import datetime
group.end = datetime.now()
group.status = 'completed'
session.commit()
[docs]def fail_run_group(group, session):
"""
End the run_group unsuccessfully.
Args:
group: The run_group we want to complete.
session: The database transaction we will finish.
"""
from datetime import datetime
group.end = datetime.now()
group.status = 'failed'
session.commit()
[docs]def exit_code_from_run_infos(run_infos: t.List[RunInfo]) -> int:
"""Generate a single exit code from a list of RunInfo objects.
Takes a list of RunInfos and returns the exit code that is furthest away
from 0.
Args:
run_infos (t.List[RunInfo]): [description]
Returns:
int: [description]
"""
assert run_infos is not None
if not hasattr(run_infos, "__iter__"):
return run_infos.retcode
rcs = [ri.retcode for ri in run_infos]
max_rc = max(rcs)
min_rc = min(rcs)
if max_rc == 0:
return min_rc
return max_rc
[docs]@contextmanager
def track_execution(cmd, project, experiment, **kwargs):
"""Guard the execution of the given command.
The given command (`cmd`) will be executed inside a database context.
As soon as you leave the context we will commit the transaction.
Any necessary modifications to the database can be identified inside
the context with the RunInfo object.
Args:
cmd: The command we guard.
project: The project we track for.
experiment: The experiment we track for.
Yields:
RunInfo: A context object that carries the necessary
database transaction.
"""
runner = RunInfo(cmd=cmd, project=project, experiment=experiment, **kwargs)
yield runner
runner.commit()
[docs]def run(command, retcode=0):
"""Execute a plumbum command, depending on the user's settings.
Args:
command: The plumbumb command to execute.
"""
return command & TEE(retcode=retcode)
[docs]def with_env_recursive(cmd, **envvars):
"""
Recursively updates the environment of cmd and all its subcommands.
Args:
cmd - A plumbum command-like object
**envvars - The environment variables to update
Returns:
The updated command.
"""
from plumbum.commands.base import BoundCommand, BoundEnvCommand
if isinstance(cmd, BoundCommand):
cmd.cmd = with_env_recursive(cmd.cmd, **envvars)
elif isinstance(cmd, BoundEnvCommand):
cmd.envvars.update(envvars)
cmd.cmd = with_env_recursive(cmd.cmd, **envvars)
return cmd
[docs]def in_builddir(sub='.'):
"""
Decorate a project phase with a local working directory change.
Args:
sub: An optional subdirectory to change into.
"""
from functools import wraps
def wrap_in_builddir(func):
"""Wrap the function for the new build directory."""
@wraps(func)
def wrap_in_builddir_func(self, *args, **kwargs):
"""The actual function inside the wrapper for the new builddir."""
p = local.path(self.builddir) / sub
if not p.exists():
LOG.error("%s does not exist.", p)
if p == local.cwd:
LOG.debug("CWD already is %s", p)
return func(self, *args, *kwargs)
with local.cwd(p):
return func(self, *args, **kwargs)
return wrap_in_builddir_func
return wrap_in_builddir
[docs]def store_config(func):
"""Decorator for storing the configuration in the project's builddir."""
from functools import wraps
@wraps(func)
def wrap_store_config(self, *args, **kwargs):
"""Wrapper that contains the actual storage call for the config."""
CFG.store(local.path(self.builddir) / ".benchbuild.yml")
return func(self, *args, **kwargs)
return wrap_store_config