"""Experiment helpers."""
import enum
import os
import logging
import subprocess
import typing as t
import sys
from contextlib import contextmanager
from benchbuild.settings import CFG
import benchbuild.signals as signals
from benchbuild.utils.cmd import mkdir
from benchbuild.utils.path import list_to_path
from benchbuild import settings
from plumbum import local, TEE
from plumbum.commands import ProcessExecutionError
LOG = logging.getLogger(__name__)
[docs]def fetch_time_output(marker, format_s, ins):
"""
Fetch the output /usr/bin/time from a.
Args:
marker: The marker that limits the time output
format_s: The format string used to parse the timings
ins: A list of lines we look for the output.
Returns:
A list of timing tuples
"""
from parse import parse
timings = [x for x in ins if marker in x]
res = [parse(format_s, t) for t in timings]
return [_f for _f in res if _f]
[docs]def begin_run_group(project):
"""
Begin a run_group in the database.
A run_group groups a set of runs for a given project. This models a series
of runs that form a complete binary runtime test.
Args:
project: The project we begin a new run_group for.
Returns:
``(group, session)`` where group is the created group in the
database and session is the database session this group lives in.
"""
from benchbuild.utils.db import create_run_group
from datetime import datetime
group, session = create_run_group(project)
group.begin = datetime.now()
group.status = 'running'
session.commit()
return group, session
[docs]def end_run_group(group, session):
"""
End the run_group successfully.
Args:
group: The run_group we want to complete.
session: The database transaction we will finish.
"""
from datetime import datetime
group.end = datetime.now()
group.status = 'completed'
session.commit()
[docs]def fail_run_group(group, session):
"""
End the run_group unsuccessfully.
Args:
group: The run_group we want to complete.
session: The database transaction we will finish.
"""
from datetime import datetime
group.end = datetime.now()
group.status = 'failed'
session.commit()
[docs]class RunInfo(object):
def __begin(self, command, project, ename, group):
"""
Begin a run in the database log.
Args:
command: The command that will be executed.
pname: The project name we belong to.
ename: The experiment name we belong to.
group: The run group we belong to.
Returns:
(run, session), where run is the generated run instance and
session the associated transaction for later use.
"""
from benchbuild.utils.db import create_run
from benchbuild.utils import schema as s
from datetime import datetime
db_run, session = create_run(command, project, ename, group)
db_run.begin = datetime.now()
db_run.status = 'running'
log = s.RunLog()
log.run_id = db_run.id
log.begin = datetime.now()
log.config = repr(CFG)
session.add(log)
session.add(db_run)
self.db_run = db_run
self.session = session
def __end(self, stdout, stderr):
"""
End a run in the database log (Successfully).
This will persist the log information in the database and commit the
transaction.
Args:
db_run: The ``run`` schema object we belong to
session: The db transaction we belong to.
stdout: The stdout we captured of the run.
stderr: The stderr we capture of the run.
"""
from benchbuild.utils.schema import RunLog
from datetime import datetime
run_id = self.db_run.id
log = self.session.query(RunLog).filter(RunLog.run_id == run_id).one()
log.stderr = stderr
log.stdout = stdout
log.status = 0
log.end = datetime.now()
self.db_run.end = datetime.now()
self.db_run.status = 'completed'
self.session.add(log)
self.session.add(self.db_run)
def __fail(self, retcode, stdout, stderr):
"""
End a run in the database log (Unsuccessfully).
This will persist the log information in the database and commit the
transaction.
Args:
db_run: The ``run`` schema object we belong to
session: The db transaction we belong to.
retcode: The return code we captured of the run.
stdout: The stdout we captured of the run.
stderr: The stderr we capture of the run.
"""
from benchbuild.utils.schema import RunLog
from datetime import datetime
run_id = self.db_run.id
log = self.session.query(RunLog).filter(RunLog.run_id == run_id).one()
log.stderr = stderr
log.stdout = stdout
log.status = retcode
log.end = datetime.now()
self.db_run.end = datetime.now()
self.db_run.status = 'failed'
self.failed = True
self.session.add(log)
self.session.add(self.db_run)
def __init__(self, **kwargs):
self.cmd = None
self.failed = False
self.project = None
self.experiment = None
self.retcode = 0
self.stdout = []
self.stderr = []
for k in kwargs:
self.__setattr__(k, kwargs[k])
# This is not atomic, careful!
self.__begin(self.cmd, self.project,
self.experiment.name, self.project.run_uuid)
signals.handlers.register(self.__fail, 15, "SIGTERM", "SIGTERM")
run_id = self.db_run.id
settings.CFG["db"]["run_id"] = run_id
def __add__(self, rhs):
if rhs is None:
return self
r = RunInfo(
retcode=self.retcode + rhs.retcode,
stdout=self.stdout + rhs.stdout,
stderr=self.stderr + rhs.stderr,
db_run=[self.db_run, rhs.db_run],
session=self.session)
return r
@property
def has_failed(self):
"""Check, whether this run failed."""
return self.failed
def __call__(self, *args, expected_retcode=0, ri=None, **kwargs):
cmd_env = settings.to_env_dict(settings.CFG)
with local.env(**cmd_env):
try:
bin_name = sys.argv[0]
retcode, stdout, stderr = \
self.cmd & TEE(retcode=expected_retcode)
f_stdout = bin_name + ".stdout"
f_stderr = bin_name + ".stderr"
with open(f_stdout, 'w') as fd_stdout:
fd_stdout.write(stdout)
with open(f_stderr, 'w') as fd_stderr:
fd_stderr.write(stderr)
self.retcode = retcode
self.stdout = stdout
self.stderr = stderr
self.__end(str(stdout), str(stderr))
except ProcessExecutionError as ex:
self.__fail(ex.retcode, ex.stderr, ex.stdout)
self.retcode = ex.retcode
self.stdout = ex.stdout
self.stderr = ex.stderr
LOG.debug("Tracked process failed")
LOG.error(str(ex))
except KeyboardInterrupt:
self.retcode = retcode
self.stdout = stdout
self.stderr = stderr
self.__fail(-1, "", "KeyboardInterrupt")
LOG.warning("Interrupted by user input")
raise
finally:
signals.handlers.deregister(self.__fail,
15, "SIGTERM", "SIGTERM")
return self
[docs] def commit(self):
self.session.commit()
def __str__(self):
return "<RunInfo (ec={}, run_id={}, stdout={}, stderr={})>".format(
self.retcode, self.db_run.id, len(self.stdout), len(self.stderr))
def __repr__(self):
return str(self)
[docs]def exit_code_from_run_infos(run_infos: t.List[RunInfo]):
assert(run_infos is not None)
if not hasattr(run_infos, "__iter__"):
return run_infos.retcode
rcs = [ri.retcode for ri in run_infos]
max_rc = max(rcs)
min_rc = min(rcs)
if max_rc == 0:
return min_rc
return max_rc
[docs]@contextmanager
def track_execution(cmd, project, experiment, **kwargs):
"""
Guard the execution of the given command.
Args:
cmd: the command we guard.
pname: the database we run under.
ename: the database session this run belongs to.
run_group: the run group this execution will belong to.
Raises:
RunException: If the ``cmd`` encounters an error we wrap the exception
in a RunException and re-raise. This ends the run unsuccessfully.
"""
runner = RunInfo(cmd=cmd, project=project, experiment=experiment, **kwargs)
yield runner
runner.commit()
[docs]def run(command, retcode=0):
"""
Execute a plumbum command, depending on the user's settings.
Args:
command & TEE(retcode=retcode)
command: The plumbumb command to execute.
"""
return command & TEE(retcode=retcode)
[docs]class UchrootEC(enum.Enum):
MNT_FAILED = 255
MNT_PROC_FAILED = 254
MNT_DEV_FAILED = 253
MNT_SYS_FAILED = 252
MNT_PTS_FAILED = 251
[docs]def retry(pb_cmd, retries=0, max_retries=10, retcode=0, retry_retcodes=None):
try:
run(pb_cmd, retcode)
except ProcessExecutionError as proc_ex:
new_retcode = proc_ex.retcode
if retries > max_retries:
LOG.error("Retried %d times. No change. Abort", retries)
raise
if new_retcode in retry_retcodes:
retry(pb_cmd, retries=retries+1,
max_retries=max_retries,
retcode=retcode,
retry_retcodes=retry_retcodes)
else:
raise
[docs]def uretry(cmd, retcode=0):
retry(cmd, retcode=retcode, retry_retcodes=[
UchrootEC.MNT_PROC_FAILED.value,
UchrootEC.MNT_DEV_FAILED.value,
UchrootEC.MNT_SYS_FAILED.value,
UchrootEC.MNT_PTS_FAILED.value
])
[docs]def uchroot_no_args():
"""Return the uchroot command without any customizations."""
from benchbuild.utils.cmd import uchroot
prefixes = CFG["container"]["prefixes"].value()
p_paths, p_libs = uchroot_env(prefixes)
uchroot = with_env_recursive(
uchroot,
LD_LIBRARY_PATH=list_to_path(p_libs),
PATH=list_to_path(p_paths))
return uchroot
[docs]def uchroot_no_llvm(*args, **kwargs):
"""
Return a customizable uchroot command.
The command will be executed inside a uchroot environment.
Args:
args: List of additional arguments for uchroot (typical: mounts)
Return:
chroot_cmd
"""
uid = kwargs.pop('uid', 0)
gid = kwargs.pop('gid', 0)
uchroot_cmd = uchroot_no_args()
uchroot_cmd = uchroot_cmd["-C", "-w", "/", "-r", os.path.abspath(".")]
uchroot_cmd = uchroot_cmd["-u", str(uid), "-g", str(gid), "-E", "-A"]
return uchroot_cmd[args]
[docs]def uchroot_mounts(prefix, mounts):
"""
Compute the mountpoints of the current user.
Args:
prefix: Define where the job was running if it ran on a cluster.
mounts: All mounts the user currently uses in his file system.
Return:
mntpoints
"""
i = 0
mntpoints = []
for mount in mounts:
if not isinstance(mount, dict):
mntpoint = "{0}/{1}".format(prefix, str(i))
mntpoints.append(mntpoint)
i = i + 1
return mntpoints
def _uchroot_mounts(prefix, mounts, uchroot):
i = 0
new_uchroot = uchroot
mntpoints = []
for mount in mounts:
src_mount = mount
if isinstance(mount, dict):
src_mount = mount["src"]
tgt_mount = mount["tgt"]
else:
tgt_mount = "{0}/{1}".format(prefix, str(i))
i = i + 1
mkdir("-p", tgt_mount)
new_uchroot = new_uchroot["-M", "{0}:/{1}".format(src_mount, tgt_mount)]
mntpoints.append(tgt_mount)
return new_uchroot, mntpoints
[docs]def uchroot_env(mounts):
"""
Compute the environment of the change root for the user.
Args:
mounts: The mountpoints of the current user.
Return:
paths
ld_libs
"""
f_mounts = [m.strip("/") for m in mounts]
ld_libs = ["/{0}/lib".format(m) for m in f_mounts]
ld_libs.extend(["/{0}/lib64".format(m) for m in f_mounts])
paths = ["/{0}/bin".format(m) for m in f_mounts]
paths.extend(["/{0}/sbin".format(m) for m in f_mounts])
paths.extend(["/{0}".format(m) for m in f_mounts])
return paths, ld_libs
[docs]def with_env_recursive(cmd, **envvars):
"""
Recursively updates the environment of cmd and all its subcommands.
Args:
cmd - A plumbum command-like object
**envvars - The environment variables to update
Returns:
The updated command.
"""
from plumbum.commands.base import BoundCommand, BoundEnvCommand
if isinstance(cmd, BoundCommand):
cmd.cmd = with_env_recursive(cmd.cmd, **envvars)
elif isinstance(cmd, BoundEnvCommand):
cmd.envvars.update(envvars)
cmd.cmd = with_env_recursive(cmd.cmd, **envvars)
return cmd
[docs]def uchroot_with_mounts(*args, **kwargs):
"""Return a uchroot command with all mounts enabled."""
uchroot_cmd = uchroot_no_args(*args, **kwargs)
uchroot_cmd, mounts = \
_uchroot_mounts("mnt", CFG["container"]["mounts"].value(), uchroot_cmd)
paths, libs = uchroot_env(mounts)
prefixes = CFG["container"]["prefixes"].value()
p_paths, p_libs = uchroot_env(prefixes)
uchroot_cmd = with_env_recursive(
uchroot_cmd,
LD_LIBRARY_PATH=list_to_path(libs+p_libs),
PATH=list_to_path(paths+p_paths))
return uchroot_cmd
[docs]def uchroot(*args, **kwargs):
"""
Return a customizable uchroot command.
Args:
args: List of additional arguments for uchroot (typical: mounts)
Return:
chroot_cmd
"""
mkdir("-p", "llvm")
uchroot_cmd = uchroot_no_llvm(*args, **kwargs)
uchroot_cmd, mounts = _uchroot_mounts(
"mnt", CFG["container"]["mounts"].value(), uchroot_cmd)
paths, libs = uchroot_env(mounts)
p_paths, p_libs = uchroot_env(CFG["container"]["prefixes"].value())
uchroot_cmd = uchroot_cmd.with_env(
LD_LIBRARY_PATH=list_to_path(libs + p_libs),
PATH=list_to_path(paths + p_paths))
return uchroot_cmd["--"]
[docs]def in_builddir(sub='.'):
"""
Decorate a project phase with a local working directory change.
Args:
sub: An optional subdirectory to change into.
"""
from functools import wraps
from os import path
def wrap_in_builddir(func):
"""Wrap the function for the new build directory."""
@wraps(func)
def wrap_in_builddir_func(self, *args, **kwargs):
"""The actual function inside the wrapper for the new builddir."""
p = path.abspath(path.join(self.builddir, sub))
try:
with local.cwd(p):
return func(self, *args, **kwargs)
except FileNotFoundError:
LOG.debug("Chdir to %s failed. Directory does not exist.", p)
return wrap_in_builddir_func
return wrap_in_builddir
[docs]def unionfs_set_up(ro_base, rw_image, mountpoint):
"""
Setup a unionfs via unionfs-fuse.
Args:
ro_base: base_directory of the project
rw_image: virtual image of actual file system
mountpoint: location where ro_base and rw_image merge
"""
if not os.path.exists(mountpoint):
mkdir("-p", mountpoint)
if not os.path.exists(ro_base):
LOG.error("Base dir does not exist: '%s'", ro_base)
raise ValueError("Base directory does not exist")
if not os.path.exists(rw_image):
LOG.error("Image dir does not exist: '%s'", ro_base)
raise ValueError("Image directory does not exist")
from benchbuild.utils.cmd import unionfs
ro_base = os.path.abspath(ro_base)
rw_image = os.path.abspath(rw_image)
mountpoint = os.path.abspath(mountpoint)
return unionfs["-f", "-o", "auto_unmount,allow_other,cow",
rw_image + "=RW:" + ro_base + "=RO", mountpoint]
[docs]def unionfs_is_active(root):
import psutil
real_root = os.path.realpath(root)
for part in psutil.disk_partitions(all=True):
if os.path.commonpath([part.mountpoint, real_root]) == real_root:
if part.fstype in ["fuse.unionfs", "fuse.unionfs-fuse"]:
return True
return False
[docs]class UnmountError(BaseException):
pass
[docs]def unionfs(base_dir='./base',
image_dir='./image',
image_prefix=None,
mountpoint='./union'):
"""
Decorator for the UnionFS feature.
This configures a unionfs for projects. The given base_dir and/or image_dir
are layered as follows:
image_dir=RW:base_dir=RO
All writes go to the image_dir, while base_dir delivers the (read-only)
versions of the rest of the filesystem.
The unified version will be provided in the project's builddir. Unmouting
is done as soon as the function completes.
Args:
base_dir:The unpacked container of a project delievered by a method
out of the container utils.
image_dir: Virtual image of the actual file system represented by the
build_dir of a project.
image_prefix: Useful prefix if the projects run on a cluster,
to identify where the job came from and where it runs.
mountpoint: Location where the filesystems merge, currently per default
as './union'.
"""
from functools import wraps
def update_cleanup_paths(new_path):
"""
Add the new path to the list of paths to clean up afterwards.
Args:
new_path: Path to the directory that need to be cleaned up.
"""
cleanup_dirs = settings.CFG["cleanup_paths"].value()
cleanup_dirs = set(cleanup_dirs)
cleanup_dirs.add(new_path)
cleanup_dirs = list(cleanup_dirs)
settings.CFG["cleanup_paths"] = cleanup_dirs
def is_outside_of_builddir(project, path_to_check):
"""Check if a project lies outside of its expected directory."""
bdir = project.builddir
cprefix = os.path.commonprefix([path_to_check, bdir])
return cprefix != bdir
def wrap_in_union_fs(func):
"""
Function that wraps a given function inside the file system.
Args:
func: The function that needs to be wrapped inside the unions fs.
Return:
The file system with the function wrapped inside.
"""
nonlocal image_prefix
@wraps(func)
def wrap_in_union_fs_func(project, *args, **kwargs):
"""
Wrap the func in the UnionFS mount stack.
We make sure that the mount points all exist and stack up the
directories for the unionfs. All directories outside of the default
build environment are tracked for deletion.
"""
container = project.container
abs_base_dir = os.path.abspath(container.local)
nonlocal image_prefix
if image_prefix is not None:
image_prefix = os.path.abspath(image_prefix)
rel_prj_builddir = os.path.relpath(
project.builddir, settings.CFG["build_dir"].value())
abs_image_dir = os.path.abspath(os.path.join(
image_prefix, rel_prj_builddir, image_dir))
if is_outside_of_builddir:
update_cleanup_paths(abs_image_dir)
else:
abs_image_dir = os.path.abspath(os.path.join(project.builddir,
image_dir))
abs_mount_dir = os.path.abspath(os.path.join(project.builddir,
mountpoint))
if not os.path.exists(abs_base_dir):
mkdir("-p", abs_base_dir)
if not os.path.exists(abs_image_dir):
mkdir("-p", abs_image_dir)
if not os.path.exists(abs_mount_dir):
mkdir("-p", abs_mount_dir)
unionfs_cmd = unionfs_set_up(
abs_base_dir, abs_image_dir, abs_mount_dir)
project_builddir_bak = project.builddir
project.builddir = abs_mount_dir
project.setup_derived_filenames()
proc = unionfs_cmd.popen()
while (not unionfs_is_active(root=abs_mount_dir)) and \
(proc.poll() is None):
pass
ret = None
if proc.poll() is None:
try:
with local.cwd(abs_mount_dir):
ret = func(project, *args, **kwargs)
finally:
project.builddir = project_builddir_bak
project.setup_derived_filenames()
from signal import SIGINT
is_running = proc.poll() is None
while unionfs_is_active(root=abs_mount_dir) and is_running:
try:
proc.send_signal(SIGINT)
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
is_running = False
LOG.debug("Unionfs shut down.")
if unionfs_is_active(root=abs_mount_dir):
raise UnmountError()
return ret
return wrap_in_union_fs_func
return wrap_in_union_fs
[docs]def store_config(func):
"""Decorator for storing the configuration in the project's builddir."""
from functools import wraps
@wraps(func)
def wrap_store_config(self, *args, **kwargs):
"""Wrapper that contains the actual storage call for the config."""
p = os.path.abspath(os.path.join(self.builddir))
CFG.store(os.path.join(p, ".benchbuild.yml"))
return func(self, *args, **kwargs)
return wrap_store_config