"""
..
Copyright (c) 2018 LG Electronics Inc.
SPDX-License-Identifier: GPL-3.0-or-later
aup.EE.Experiment module
========================
:mod:`aup.EE.Experiment` is called by `aup.__main__` to start an experiment.
See :doc:`algorithm` for configuration specification.
APIs
----
"""
import json
import logging
import os
import signal
import sys
import _thread
import threading
import time
from ..Proposer import get_proposer, SPECIAL_EXIT_PROPOSERS
from .Job import Job
from .Resource import get_resource_manager
from ..aup import BasicConfig
from ..utils import set_default_keyvalue, check_missing_key, get_default_connector, get_default_username
from ..compression.utils import *
from ..Proposer import ProposerStatus
logger = logging.getLogger(__name__)
def _verify_config(config):
"""
verify the experiment configuration is fulfilled for experiment
:param config: experiment configuration
:return: config if verified
"""
check_missing_key(config, "script", "Missing required value for 'script'.", log=logger)
check_missing_key(config, "resource", "Missing required value for 'resource'", log=logger)
check_missing_key(config, "name", "Missing required value for 'name'", log=logger)
return config
[docs]class Experiment:
"""
Experiment Class - create and run an experiment
:param exp_config: configuration of the experiment
:type exp_config: BasicConfig.BasicConfig
:param username: username, default: None - will use login username
:type username: str
:param connector: connector to database
:type connector: AbstractConnector
:param auppath: Auptimizer env.ini file folder, default is either ``./.aup`` or ``~/.aup``
:type auppath: str
:param sleep_time: time to pause between jobs
:type sleep_time: int
"""
def __init__(self,
exp_config,
username=None,
connector=None,
auppath=os.path.join(".aup"),
sleep_time=1,
eid=None,
start=True,
request_stop_time=5):
self.sleep_time = sleep_time
self.fail_safe = False
self.job_retries = 0
self.exp_config = _verify_config(exp_config)
self.resource_args = None
self.connector = connector if connector else get_default_connector(auppath=auppath, log=logger)
self.username = get_default_username(username)
self.is_compression_exp = False
self.compression_params = []
self.request_stop_thr = None
self.request_stop_time = request_stop_time
self.submitted = False
if "job_failure" in self.exp_config:
set_default_keyvalue("ignore_fail", False, self.exp_config["job_failure"], log=logger)
set_default_keyvalue("job_retries", 3, self.exp_config["job_failure"], log=logger)
self.fail_safe = self.exp_config["job_failure"]["ignore_fail"]
self.job_retries = self.exp_config["job_failure"]["job_retries"]
if "compression" in self.exp_config:
self.is_compression_exp = True
self.exp_config, self.compression_params = translate_compression_config(self.exp_config)
set_default_keyvalue("cwd", os.getcwd(), self.exp_config, log=logger)
set_default_keyvalue("workingdir", os.getcwd(), self.exp_config, log=logger)
set_default_keyvalue("n_parallel", 1, self.exp_config, log=logger)
check_missing_key(self.exp_config, "target", "Specify max/min for target", log=logger)
check_missing_key(self.exp_config, "proposer", "Specify the optimization `proposer`", log=logger)
self.proposer = get_proposer(self.exp_config['proposer'])(self.exp_config)
if "resource_args" in self.exp_config:
if "early_stop" in self.exp_config["resource_args"]:
self.exp_config["resource_args"]["track_intermediate_results"] = True
self.resource_manager = get_resource_manager(self.exp_config["resource"], self.connector,
self.exp_config["n_parallel"], auppath=auppath,
maximize=(self.exp_config["target"] == "max"),
**self.exp_config["resource_args"],
workingdir=self.exp_config['workingdir'],
script=self.exp_config['script'],
runtime_args = exp_config.get('runtime_args', {}))
self.resource_args = self.exp_config["resource_args"]
else:
self.resource_manager = get_resource_manager(self.exp_config["resource"], self.connector,
self.exp_config["n_parallel"], auppath=auppath,
maximize=(self.exp_config["target"] == "max"),
workingdir=self.exp_config['workingdir'],
script=self.exp_config['script'],
runtime_args = exp_config.get('runtime_args', {}))
if eid is None:
if start is True:
self.eid = self.resource_manager.connector.start_experiment(self.username, self.exp_config["name"], \
json.dumps(self.exp_config))
else:
self.eid = self.resource_manager.connector.create_experiment(self.username, self.exp_config["name"], \
json.dumps(self.exp_config))
else:
self.eid = eid
self.resource_manager.connector.start_experiment_by_eid(self.eid)
self.resource_manager.eid = self.eid
self.pending_jobs = {}
if 'runtime_args' in exp_config:
self.runtime_args = exp_config['runtime_args']
else:
self.runtime_args = {}
logger.info("Experiment %d is created" % self.eid)
logger.debug("Experiment config is %s" % json.dumps(self.exp_config))
[docs] def add_suspend_signal(self):
signal.signal(signal.SIGINT, lambda x, y: self._suspend(x, y))
[docs] def add_refresh_signal(self):
signal.signal(signal.SIGUSR1, lambda x, y: self._force_refresh(x, y))
[docs] def finish(self):
"""
Finish experiment if no job is running
:return: job id, best score
:rtype: (int, float)
"""
while self.proposer.get_status() == ProposerStatus.RUNNING:
logger.debug("Waiting for proposer")
time.sleep(self.sleep_time)
while len(self.pending_jobs) != 0:
# resource manager will prevent experiment shutdown with pending jobs.
# but just in case
logger.debug("Waiting for pending job")
time.sleep(self.sleep_time)
result = self.resource_manager.finish(status=self.proposer.get_status().name)
self.connector.close()
if self.request_stop_thr is not None:
self.request_stop_thr.join()
if result is None or len(result) == 0:
logger.warning("No result so far")
return None, -1
else:
logger.info("Finished")
logger.critical("Best job (%d) with score %f in experiment %d" % (result[0], result[1], self.eid))
try:
self.proposer.save(os.path.join(".", "exp%d.pkl" % self.eid))
except NotImplementedError:
pass
return result[:2]
[docs] def resume(self, filename):
"""
Restore previous experiment, previous job during suspension won't be run in this round
:param filename: filename (saved by pickle as exp%d.pkl)
:type filename: str
"""
self.proposer.reload(filename) # Note: previously failed jobs won't be execute again.
self.start()
[docs] def start(self):
"""
Start experiment
"""
remaining_jobs = self.proposer.get_remaining_jobs()
parallel_jobs = min(remaining_jobs, self.exp_config.n_parallel)
self.request_stop_thr = threading.Thread(target=self._check_status)
self.request_stop_thr.start()
for i in range(parallel_jobs - len(self.pending_jobs)):
rc = self.submit_job()
self.submitted = self.submitted or rc
if not self.submitted:
logger.fatal("No job is running; quit")
self.proposer.set_status(ProposerStatus.FAILED)
raise Exception("Cannot run experiment!")
elif not rc:
logger.warning("Job submission failed, keep running")
[docs] def submit_job(self, job=None, rid_blacklist=None):
"""
Submit a new job to run if there is resource available
:param job: optional job parameter in case a job needs resubmitting
:type job: aup.EE.Job.Job object
:param rid_blacklist: resource ids to exclude when submitting job
:type rid_blacklist: [int]
:return: True if job submitted, else False
"""
rid = self.resource_manager.get_available(self.username, self.exp_config["resource"], rid_blacklist=rid_blacklist)
if rid is None:
self.resource_manager.log_error_message("Not enough resources!")
logger.warning("Increase resource or reduce n_parallel, no enough resources")
return False
if job is None:
proposal = self.proposer.get()
if proposal is not None and self.is_compression_exp:
proposal = deserialize_compression_proposal(self.exp_config, self.compression_params, proposal)
self.proposer.increment_job_counter()
if job is None and proposal is None:
if self.exp_config['proposer'] in SPECIAL_EXIT_PROPOSERS:
logger.info("%s is waiting to finish." % self.exp_config['proposer'])
return True
else:
logger.warning("Waiting other jobs finished\n"
"Think about rebalance your task loads, if you see this message shows up too many")
return False
else:
if job is None:
job_config = BasicConfig(**proposal)
job = Job(self.exp_config["script"], job_config, self.exp_config["workingdir"], retries=self.job_retries)
job.jid = self.resource_manager.connector.job_started(self.eid, rid, job_config)
else:
self.resource_manager.connector.job_retry(rid, job.jid)
logger.info("Submitting job %d with resource %d in experiment %d" % (job.jid, rid, self.eid))
job.was_executed = False
self.pending_jobs[job.jid] = job
# update the status, but after appending to pending_jobs
# to avoid premature termination
self.proposer.check_termination()
self.resource_manager.run_job(job, rid, self.exp_config, self.update, **self.runtime_args)
return True
[docs] def update(self, score, jid):
"""
Callback function passed to :mod:`aup.EE.Resource.AbstractResourceManager` to
update the job history (also proposer and connector)
:param score: score returned from job (using :func:`aup.utils.print_result`)
:type score: float
:param jid: job id
:type jid: int
"""
if score == "ERROR":
job = self.pending_jobs.pop(jid)
if job.jid in self.resource_manager.jobs and \
job.curr_retries < job.retries:
rid = self.resource_manager.jobs[jid]
job.rid_blacklist.add(rid)
self.resource_manager.connector.job_failed(rid, jid)
job.curr_retries += 1
logger.info("Retrying job %d (%d/%d)" % (jid, job.curr_retries, job.retries))
self.submit_job(job, rid_blacklist=job.rid_blacklist)
elif not self.fail_safe:
self.resource_manager.finish_job(jid, None, "FAILED")
self.proposer.set_status(ProposerStatus.FAILED)
logger.fatal("Stop Experiment due to job failure (ignore_fail flag set to false)")
else:
self.resource_manager.finish_job(jid, None, "FAILED")
try:
self.proposer.failed(job)
except Exception as ex:
self.proposer.set_status(ProposerStatus.FAILED)
logger.fatal("Stop Experiment due to job failure (failed jobs unsupported by proposer)")
logger.info("Job %d is finished (failed)" % (jid))
if self.proposer.get_status() == ProposerStatus.RUNNING:
self.start()
elif score == "EARLY STOPPED":
self.pending_jobs.pop(jid)
self.resource_manager.finish_job(jid, score, "EARLY_STOPPED")
logger.info("Job %d was early stopped" % (jid))
if self.proposer.get_status() == ProposerStatus.RUNNING:
self.start()
else:
self.proposer.update(score, self.pending_jobs[jid])
self.pending_jobs.pop(jid)
self.resource_manager.finish_job(jid, score, "FINISHED")
logger.info("Job %d is finished with result %s" % (jid, score))
if self.proposer.get_status() == ProposerStatus.RUNNING:
self.start()
def _suspend(self, sig, frame):
"""
Stop experiment by enter "Ctrl-C"
"""
logger.fatal("Experiment ended at user's request")
for i in self.pending_jobs:
logger.warning("Job with ID %d is cancelled" % i) # Note: cancelled job won't be run again.
try:
self.proposer.save(os.path.join(".", "exp%d.pkl" % self.eid))
except NotImplementedError:
pass
self.resource_manager.suspend()
result = self.resource_manager.finish(status="STOPPED")
self.connector.close()
if result is None:
logger.warning("No valid result so far")
else:
logger.critical("Best job (%d) with score %f in experiment %d" % (result[0], result[1], self.eid))
if self.request_stop_thr is not None:
self.request_stop_thr.join()
sys.exit(1)
def _check_status(self):
"""
Checks the database status of the experiment for external stopping requests
This method is run continuously in a separate "clean-up" thread in order to check for
external modifications to the experiment status in the database, in case a user
wants to stop an experiment remotely (e.g. from another process).
"""
if self.connector is None or self.eid is None:
logger.warning("Could not start thread for checking external experiment stopping requests.")
return
while True:
try:
if self.connector.is_closed():
logger.debug("Closing down clean-up thread.")
return
status = self.connector.maybe_get_experiment_status(self.eid)
if status == "REQUEST_STOP":
return _thread.interrupt_main()
time.sleep(self.request_stop_time)
except Exception as ex:
logger.debug("Error in clean-up thread: {}".format(ex))
def _force_refresh(self, sig, frame):
# currently useful for async resource manager timers
self.resource_manager.refresh()