"""
Extension base-classes for compile-time and run-time experiments.
"""
from abc import ABCMeta
from collections import Iterable
import logging
import os
import parse
import yaml
from plumbum import local
from benchbuild.utils.run import (track_execution, fetch_time_output)
from benchbuild.utils.db import persist_config, persist_time
LOG = logging.getLogger(__name__)
[docs]class Extension(metaclass=ABCMeta):
def __init__(self, *extensions, config=None, **kwargs):
"""Initialize an extension with an arbitrary number of children."""
self.next_extensions = extensions
self.config = config
[docs] def call_next(self, *args, **kwargs):
"""Call all child extensions with the same arguments."""
all_results = []
for ext in self.next_extensions:
LOG.debug(" ++ - %s ", ext.__class__)
results = ext(*args, **kwargs)
LOG.debug(" -- - %s => %s", ext.__class__, results)
if results is None:
LOG.warning("No result from: %s", ext.__class__)
continue
result_list = []
if isinstance(results, Iterable):
result_list.extend(results)
else:
result_list.append(results)
status_list = [r.db_run.status for r in result_list]
LOG.debug(" -- %s - %s => %s", str(status_list),
ext.__class__, results)
all_results.extend(result_list)
return all_results
def __lshift__(self, rhs):
rhs.next_extensions = [self]
return rhs
[docs] def print(self, indent=0):
"""Print a structural view of the registered extensions."""
LOG.info("%s:: %s", indent * " ", self.__class__)
for ext in self.next_extensions:
ext.print(indent=indent+2)
def __call__(self, *args, **kwargs):
return self.call_next(*args, **kwargs)
[docs]class RuntimeExtension(Extension):
def __init__(self, project, experiment, *extensions, config=None):
self.project = project
self.experiment = experiment
super(RuntimeExtension, self).__init__(*extensions, config=config)
def __call__(self, binary_command, *args, **kwargs):
self.project.name = kwargs.get("project_name", self.project.name)
cmd = binary_command[args]
with track_execution(cmd, self.project,
self.experiment, **kwargs) as run:
run_info = run()
if self.config:
LOG.info("")
LOG.info("==CONFIG==")
LOG.info(yaml.dump(self.config,
width=40,
indent=4,
default_flow_style=False))
LOG.info("==CONFIG==")
LOG.info("")
self.config['baseline'] = \
os.getenv("BB_IS_BASELINE", "False")
persist_config(run_info.db_run, run_info.session, self.config)
res = self.call_next(binary_command, *args, **kwargs)
res.append(run_info)
return res
[docs]class RunWithTimeout(Extension):
def __init__(self, *extensions, limit="10m", **kwargs):
super(RunWithTimeout, self).__init__(*extensions, **kwargs)
self.limit = limit
def __call__(self, binary_command, *args, **kwargs):
from benchbuild.utils.cmd import timeout
return self.call_next(timeout[
self.limit, binary_command], *args, **kwargs)
[docs]class LogTrackingMixin(object):
"""Add log-registering capabilities to extensions."""
_logs = []
[docs] def add_log(self, path):
self._logs.append(path)
@property
def logs(self):
return self._logs
[docs]class LogAdditionals(Extension):
"""Log any additional log files that were registered."""
def __call__(self, *args, **kwargs):
from benchbuild.utils.cmd import cat
from plumbum import FG
if not self.next_extensions:
return None
res = self.call_next(*args, **kwargs)
for ext in self.next_extensions:
if issubclass(ext.__class__, (LogTrackingMixin)):
for log in ext.logs:
LOG.debug("Dumping content of '%s'.", log)
cat[log] & FG
LOG.debug("Dumping content of '%s' complete.", log)
return res
[docs]class RunWithTime(Extension):
"""Wrap a command with time and store the timings in the database."""
def __call__(self, binary_command, *args, may_wrap=True, **kwargs):
from benchbuild.utils.cmd import time
time_tag = "BENCHBUILD: "
if may_wrap:
run_cmd = time["-f", time_tag + "%U-%S-%e", binary_command]
def handle_timing(run_infos):
"""Takes care of the formating for the timing statistics."""
from benchbuild.utils import schema as s
session = s.Session()
for run_info in run_infos:
if may_wrap:
timings = fetch_time_output(
time_tag,
time_tag + "{:g}-{:g}-{:g}",
run_info.stderr.split("\n"))
if timings:
persist_time(run_info.db_run, session, timings)
else:
LOG.warning("No timing information found.")
session.commit()
return run_infos
res = self.call_next(run_cmd, *args, **kwargs)
return handle_timing(res)
[docs]class RunCompiler(Extension):
def __init__(self, project, experiment, *extensions, config=None):
self.project = project
self.experiment = experiment
super(RunCompiler, self).__init__(*extensions, config=config)
def __call__(self, command, *args,
experiment_cflags=[],
experiment_ldflags=[],
rerun_on_error=True,
**kwargs):
original_command = command[args]
new_command = command["-Qunused-arguments"]
new_command = new_command[args]
new_command = new_command[experiment_cflags]
new_command = new_command[experiment_ldflags]
with track_execution(new_command, self.project,
self.experiment, **kwargs) as run:
run_info = run()
if self.config:
LOG.info(yaml.dump(self.config,
width=40,
indent=4,
default_flow_style=False))
persist_config(run_info.db_run, run_info.session, self.config)
if run_info.has_failed:
with track_execution(original_command, self.project,
self.experiment, **kwargs) as run:
LOG.warning("Fallback to: %s", str(original_command))
run_info = run()
res = self.call_next(new_command, *args, **kwargs)
res.append(run_info)
return res
[docs]class SetThreadLimit(Extension):
def __call__(self, binary_command, *args, **kwargs):
from benchbuild.settings import CFG
config = self.config
if config is not None and 'jobs' in config.keys():
jobs = config['jobs']
else:
LOG.warning("Parameter 'config' was unusable, using defaults")
jobs = CFG["jobs"].value()
ret = None
with local.env(OMP_NUM_THREADS=str(jobs)):
ret = self.call_next(binary_command, *args, **kwargs)
return ret