Source code for benchbuild.reports
"""
Register reports for an experiment
"""
import importlib
import logging
from benchbuild.settings import CFG
LOG = logging.getLogger(__name__)
[docs]def discover():
"""
Import all experiments listed in *_PLUGINS_REPORTS.
Tests:
>>> from benchbuild.settings import CFG
>>> from benchbuild.reports import discover
>>> import logging as lg
>>> import sys
>>> l = lg.getLogger('benchbuild')
>>> l.setLevel(lg.DEBUG)
>>> l.handlers = [lg.StreamHandler(stream=sys.stdout)]
>>> CFG["plugins"]["reports"] = ["benchbuild.non.existing", "benchbuild.reports.raw"]
>>> discover()
Could not find 'benchbuild.non.existing'
Found report: benchbuild.reports.raw
"""
if CFG["plugins"]["autoload"].value():
report_plugins = CFG["plugins"]["reports"].value()
for ep in report_plugins:
try:
importlib.import_module(ep)
LOG.debug("Found report: %s", ep)
except ImportError:
LOG.error("Could not find '%s'", ep)
[docs]class ReportRegistry(type):
reports = {}
def __init__(cls, name, bases, dict):
super(ReportRegistry, cls).__init__(name, bases, dict)
if cls.SUPPORTED_EXPERIMENTS is not None:
for exp in cls.SUPPORTED_EXPERIMENTS:
if exp in ReportRegistry.reports:
ReportRegistry.reports[exp].append(cls)
else:
ReportRegistry.reports[exp] = [cls]
[docs]def load_experiment_ids_from_names(session, names):
from sqlalchemy import func, column
from sqlalchemy.sql import select, bindparam
exps = select([column('id')]).\
select_from(
func.experiments(bindparam('names'))
)
r1 = session.execute(
exps.unique_params(names=names)
)
return r1.fetchall()
[docs]class Report(object, metaclass=ReportRegistry):
SUPPORTED_EXPERIMENTS = []
def __new__(cls, *args, **kwargs):
new_self = super(Report, cls).__new__(cls)
if cls.SUPPORTED_EXPERIMENTS is None:
raise AttributeError(
"{0} @ {1} does not define a SUPPORTED_EXPERIMENTS attribute"
.format(cls.__name__, cls.__module__))
new_self.experiments = cls.SUPPORTED_EXPERIMENTS
return new_self
def __init__(self, exp_name, exp_ids, out_path):
import benchbuild.utils.schema as schema
import uuid
self.out_path = out_path
self.session = schema.Session()
if not exp_ids:
exp_ids = load_experiment_ids_from_names(
self.session,
[exp for exp in self.SUPPORTED_EXPERIMENTS if exp == exp_name])
exp_ids = [v[0] for v in exp_ids]
else:
exp_ids = [uuid.UUID(v) for v in exp_ids]
self.experiment_ids = exp_ids