Source code for benchbuild.signals

import functools
import logging
import signal
import sys

LOG = logging.getLogger(__name__)


[docs]class CleanupOnSignal: __stored_procedures = {} @property def stored_procedures(self): return self.__stored_procedures
[docs] def register(self, callback, *args, **kwargs): new_func = functools.partial(callback, *args, **kwargs) self.__stored_procedures[callback] = new_func
[docs] def deregister(self, callback): del self.__stored_procedures[callback]
def __call__(self): for k in self.stored_procedures: LOG.debug("Running stored cleanup procedure: %r", k) self.stored_procedures[k]()
handlers = CleanupOnSignal() def __handle_sigterm(signum, frame): del frame LOG.debug("Got SIGTERM, running cleanup handlers") handlers() sys.exit(signum) signal.signal(signal.SIGTERM, __handle_sigterm)