Source code for benchbuild.extensions.run

import logging
import os

import yaml
from plumbum import local

from benchbuild.extensions import base
from benchbuild.utils import db, run

LOG = logging.getLogger(__name__)


[docs]class RuntimeExtension(base.Extension): """ Default extension to execute and track a binary. This can be used for runtime experiments to have a controlled, tracked execution of a wrapped binary. """ def __init__(self, project, experiment, *extensions, config=None): self.project = project self.experiment = experiment super(RuntimeExtension, self).__init__(*extensions, config=config) def __call__(self, binary_command, *args, **kwargs): self.project.name = kwargs.get("project_name", self.project.name) cmd = binary_command[args] with run.track_execution(cmd, self.project, self.experiment, **kwargs) as _run: run_info = _run() if self.config: run_info.add_payload("config", self.config) LOG.info( yaml.dump( self.config, width=40, indent=4, default_flow_style=False)) self.config['baseline'] = \ os.getenv("BB_IS_BASELINE", "False") db.persist_config(run_info.db_run, run_info.session, self.config) res = self.call_next(binary_command, *args, **kwargs) res.append(run_info) return res def __str__(self): return "Run wrapped binary"
[docs]class WithTimeout(base.Extension): """ Guard a binary with a timeout. This wraps a any binary with a call to `timeout` and sets the limit to a given value on extension construction. """ def __init__(self, *extensions, limit="10m", **kwargs): super(WithTimeout, self).__init__(*extensions, **kwargs) self.limit = limit def __call__(self, binary_command, *args, **kwargs): from benchbuild.utils.cmd import timeout return self.call_next(timeout[self.limit, binary_command], *args, **kwargs)
[docs]class SetThreadLimit(base.Extension): """Sets the OpenMP thread limit, based on your settings. This extension uses the 'jobs' settings and controls the environment variable OMP_NUM_THREADS. """ def __call__(self, binary_command, *args, **kwargs): from benchbuild.settings import CFG config = self.config if config is not None and 'jobs' in config.keys(): jobs = config['jobs'] else: LOG.warning("Parameter 'config' was unusable, using defaults") jobs = int(CFG["jobs"]) ret = None with local.env(OMP_NUM_THREADS=str(jobs)): ret = self.call_next(binary_command, *args, **kwargs) return ret def __str__(self): return "Limit number of OpenMP threads"
[docs]class Rerun(base.Extension): pass