Source code for benchbuild.signals
import functools
import logging
import signal
import sys
LOG = logging.getLogger(__name__)
[docs]class CleanupOnSignal:
__stored_procedures = {}
@property
def stored_procedures(self):
return self.__stored_procedures
[docs] def register(self, callback, *args, **kwargs):
new_func = functools.partial(callback, *args, **kwargs)
self.__stored_procedures[callback] = new_func
[docs] def deregister(self, callback):
del self.__stored_procedures[callback]
def __call__(self):
for k in self.stored_procedures:
LOG.debug("Running stored cleanup procedure: %r", k)
self.stored_procedures[k]()
handlers = CleanupOnSignal()
def __handle_sigterm(signum, frame):
del frame
LOG.debug("Got SIGTERM, running cleanup handlers")
handlers()
sys.exit(signum)
signal.signal(signal.SIGTERM, __handle_sigterm)