"""
..
Copyright (c) 2018 LG Electronics Inc.
SPDX-License-Identifier: GPL-3.0-or-later
aup.EE.Resource.AbstractResourceManager
=======================================
Abstract Interface of Resource Managers.
Using :func:`get_resource_manager` to create the corresponding object with the following resource type.
For different resource supports, see :doc:`environment`.
APIs
----
"""
import abc
import copy
import importlib
import logging
import random
import threading
import time
import numpy as np
import math
import warnings
from ...utils import DEFAULT_AUPTIMIZER_PATH
from .utils.curve_fitting import CurveModel
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
EARLY_STOPPING_SLEEP = 1
CURVE_FITTING_MIN_ITS = 4
logger = logging.getLogger(__name__)
_SupportResource = {"gpu": "GPUResourceManager",
"cpu": "CPUResourceManager",
"node": "SSHResourceManager",
"aws": "AWSResourceManager",
"passive": "PassiveResourceManager"}
[docs]def get_resource_manager(resource, connector, n_parallel, auppath=DEFAULT_AUPTIMIZER_PATH, **kwargs):
"""
Get resource manager for a specific resource type
:param resource: gpu or cpu type resource
:type resource: str
:param connector: database connector
:type connector: AbstractConnector
:param n_parallel: how many parallel jobs to be run
:type n_parallel: int
:param auppath: aup environment folder
:type auppath: str
:return: resource manager
:rtype: AbstractResourceManager
"""
try:
resource = _SupportResource[resource]
except KeyError:
raise KeyError("%s not implemented" % resource)
mod = importlib.import_module(".%s" % resource, "aup.EE.Resource")
return mod.__dict__[resource](connector, n_parallel, auppath=auppath, **kwargs)
[docs]class AbstractResourceManager(ABC):
"""
Create Resource to run jobs.
:param connector: Connector to database
:type connector: AbstractConnector
"""
def __init__(self, connector, n_parallel, *args, **kwargs):
self.connector = connector
self.jobs = dict()
warnings.filterwarnings("ignore")
self.curr_global_iteration = 0
self.maximize = kwargs.get("maximize", True)
self.stopped_jobs = None
self.stopped_jobs_lock = threading.Lock()
self.n_parallel = n_parallel
self.eid = kwargs.get("eid", None)
self.result_labels = kwargs.get('multi_res_labels', None)
self.track_intermediate_results = kwargs.get("track_intermediate_results", False)
self.interm_job_res = None
if self.track_intermediate_results:
self.interm_job_res = dict()
# variables for early stop impl
if "early_stop" in kwargs:
self.policy = kwargs["early_stop"]["aup_policy"]
self.policy_steps = kwargs["early_stop"]["aup_policy_steps"]
self.warmup = kwargs["early_stop"].get("warmup", 0)
self.bandit_factor = kwargs["early_stop"].get("bandit_factor", 0.5)
self.truncation_percentage = kwargs["early_stop"].get("truncation_percentage", 0.3)
self.curve_fitting_threshold = kwargs["early_stop"].get("curve_fitting_threshold", 0.95)
self.curve_fitting_max_iters = kwargs["early_stop"].get("curve_fitting_max_iters", None)
self.curve_fitting_timeout = kwargs["early_stop"].get("curve_fitting_timeout", 60)
self.job_checked = dict()
self.early_stop_daemon_finished = False
self.stopped_jobs = set()
if self.policy == "curve_fitting" and self.curve_fitting_max_iters is None:
raise ValueError("Curve fitting policy requires argument \"curve_fitting_max_iters\" representing " +
"the total number of intermediate results that the script will provide.")
self.early_stop_daemon = threading.Thread(target=AbstractResourceManager.early_stop_daemon_fun,
args=(self,), daemon=True)
self.early_stop_daemon.start()
else:
self.warmup = None
self.policy = None
self.policy_steps = 0
self.job_checked = dict()
self.early_stop_daemon_finished = True
self.early_stop_daemon = None
self.stopped_jobs = None
[docs] def finish(self, status="FINISHED"):
"""
Finish up the resource allocation.
:param status: status of the experiment
:type status: string
:return: Max/Min result in experiment (job id, score)
:rtype: None | [int, float]
"""
self.connector.end_experiment(self.eid, status)
if self.early_stop_daemon != None:
self.early_stop_daemon_finished = True
self.early_stop_daemon.join()
return self.connector.get_best_result(self.eid, maximize=self.maximize)
[docs] def finish_job(self, jid, score, status=None):
"""
Finish one job
:param jid: job ID
:type jid: int
:param score: job for the experiment
:type score: float | None
"""
if jid in self.jobs:
rid = self.jobs.pop(jid)
self.connector.job_finished(rid, jid, score, status)
else:
logger.warning("Job %d finished after job suspension, result may lose" % jid)
[docs] def get_available(self, username, rtype, rid_blacklist=None):
"""
method to get the available resource to run a job
:param username: username for job running
:type username: str
:param rtype: resource type
:type rtype: str
:param rid_blacklist: resource ids to ignore
:type rid_blacklist: [int]
:return: a random selection of all available resource IDs
:rtype: int
"""
rids = self.connector.get_available_resource(username, rtype, rid_blacklist)
logger.debug("Request resource (%s) for user %s and get %s" % (rtype, username, rids.__str__()))
return random.choice(rids) if rids else None
[docs] def run_job(self, job, rid, exp_config, call_back_func, **kwargs):
"""
Job running interface, this is called by :mod:`aup.EE.Experiment`.
It is a wrapper for :func:`run`.
:param job: Job to run
:type job: Job
:param rid: resource ID
:type rid: int
:param exp_config: experiment configuration
:type exp_config: BasicConfig
:param call_back_func: call back function to update result
:type call_back_func: function object
"""
self.connector.take_available_resource(rid)
self.jobs[job.jid] = rid
if self.interm_job_res != None:
self.interm_job_res[job.jid] = list()
self.job_checked[job.jid] = list()
try:
self.run(job, rid, exp_config, call_back_func, **kwargs)
except EnvironmentError as e:
self.connector.free_used_resource(rid)
logger.fatal("Experiment interrupted.")
raise(e)
[docs] def append_interm_res(self, jid, interm_res):
if self.interm_job_res == None:
return None
if jid in self.interm_job_res:
self.interm_job_res[jid].append(interm_res)
if self.connector:
return self.connector.save_intermediate_result(jid, interm_res)
else:
logger.warning("Could not save intermediate result: no connector attached to resource manager")
return None
else:
logger.fatal("Job {} should have already started!".format(jid))
return None
[docs] def append_multiple_results(self, jid, irid, eid, scores):
if self.result_labels is None or len(scores) == 0:
return
assert len(self.result_labels) == len(scores), \
"labels size mismatch with the provided scores"
if self.connector is not None:
self.connector.save_multiple_results(jid, irid, eid, self.result_labels, scores)
[docs] def set_last_multiple_results(self, eid, jid):
if self.result_labels is None:
return
if self.connector is not None:
self.connector.set_last_multiple_results(eid, jid, len(self.result_labels))
[docs] def stop_job(self, jid):
"""
Stop a job for early stopping strategies
:param jid: job ID
:type jid: int
"""
if jid not in self.jobs:
logger.debug("Tried to stop job {} not in currently running jobs.".format(jid))
with self.stopped_jobs_lock:
self.stopped_jobs.add(jid)
[docs] def is_job_stopped(self, jid):
"""
Returns whether or not a specific job stop is pending
:param jid: job ID
:type jid: int
:return: whether or not the given job ID is in the list of pending job stops
:rtype: bool
"""
with self.stopped_jobs_lock:
return self.stopped_jobs is not None and jid in self.stopped_jobs
[docs] @abc.abstractmethod
def run(self, job, rid, exp_config, call_back_func, **kwargs):
"""
Job running implemented for the specific resource manager.
It is called by :func:`run_job`.
:param job: a job object
:type job: Job
:param rid: resource id returned from :func:`get_available`.
:type rid: int
:param exp_config: experiment configuration
:type exp_config: BasicConfig
:param call_back_func: function to trigger after job finished
:type call_back_func: function object
"""
raise NotImplementedError
[docs] def suspend(self):
"""
Suspend job upon request
"""
for jid in list(self.jobs.keys()):
self.finish_job(jid, None)
logger.warning("Job %d is canceled" % jid)
[docs] def run_curve_fitting(self, interm_res, c_jid, step, comp_fn, curve_fitting_threshold, best_val):
curvemodel = CurveModel(self.curve_fitting_max_iters)
predict_y = curvemodel.predict(interm_res, timeout=self.curve_fitting_timeout)
if predict_y is None:
return
if not comp_fn(predict_y, curve_fitting_threshold * best_val):
if self.is_job_stopped(c_jid):
return
self.stop_job(c_jid)
logger.info("Stopping job {} early (step {}): predicted end value {:.4f} is lower than the best value so far {:.4f} within the given {:.2f}% threshold (={:.4f})".format(
c_jid, step, predict_y, best_val, 100. * curve_fitting_threshold, curve_fitting_threshold * best_val))
[docs] def early_stop_daemon_fun(self):
while not self.early_stop_daemon_finished:
# do not consider the early stopped jobs
with self.stopped_jobs_lock:
current_jobs = set(self.jobs) - self.stopped_jobs
finished_interm_job_res = self.connector.get_intermediate_results_experiment(self.eid, "FINISHED")
current_interm_job_res = self.connector.get_intermediate_results_jobs(list(current_jobs))
interm_job_res = {**current_interm_job_res, **finished_interm_job_res}
best_fn = np.max if self.maximize else np.min
comp_fn = (lambda x, target: x >= target) if self.maximize else \
(lambda x, target: x <= target)
curve_fitting_threads = []
for c_jid, c_interm_res in current_interm_job_res.items():
if len(c_interm_res) < self.warmup:
continue
if c_jid not in self.job_checked:
self.job_checked[c_jid] = []
k = len(c_interm_res) // self.policy_steps
if k < 1:
time.sleep(EARLY_STOPPING_SLEEP)
continue
step = k * self.policy_steps
if step in self.job_checked[c_jid]: # job already compared up until this step, waiting for next k multiple
time.sleep(EARLY_STOPPING_SLEEP)
continue
comp_interm_job_res = {jid: vals for jid, vals in interm_job_res.items() if len(vals) >= step and jid != c_jid}
if len(comp_interm_job_res) < 1: # too few jobs
time.sleep(EARLY_STOPPING_SLEEP)
continue
if self.policy == "median":
avgs = [np.average(vals[:step]) for vals in comp_interm_job_res.values()]
median = np.median(avgs)
best_val = np.average(c_interm_res[:step])
if not comp_fn(best_val, median):
self.stop_job(c_jid)
logger.info("Stopping job {} early (step {}): best value so far {:.4f} worse than median of averages {:.4f} for {} other jobs".format(
c_jid, step, best_val, median, len(comp_interm_job_res)))
elif self.policy == "bandit":
bandit_best_val = best_fn([best_fn(vals[:step]) for vals in comp_interm_job_res.values()])
best_val = best_fn(c_interm_res[:step])
bandit_factor = self.bandit_factor if ((self.maximize and np.sign(bandit_best_val) == 1) or (not self.maximize and np.sign(bandit_best_val) == -1)) else \
2 - self.bandit_factor
if not comp_fn(best_val, bandit_factor * bandit_best_val):
self.stop_job(c_jid)
logger.info("Stopping job {} early (step {}): best value so far {:.4f} worse than a factor {:.4f} of best overall value {:.4f} (={:.4f}) for {} other jobs".format(
c_jid, step, best_val, bandit_factor, bandit_best_val, bandit_factor * bandit_best_val, len(comp_interm_job_res)))
elif self.policy == "truncation":
best_vals = sorted([(jid, best_fn(vals[:step])) for jid, vals in (list(comp_interm_job_res.items()) + [(c_jid, c_interm_res)])],
key=lambda t: t[1], reverse=not self.maximize)
best_val_idx = next((idx for idx, (jid, val) in enumerate(best_vals) if jid == c_jid)) + 1
perc = float(best_val_idx) / len(best_vals)
if perc <= self.truncation_percentage:
self.stop_job(c_jid)
logger.info("Stopping job {} early (step {}): best value so far {:.4f} is in the bottom {:.2f}% of {} jobs, which is lower than the {:.2f}% cutoff".format(
c_jid, step, best_vals[best_val_idx-1][1], 100. * best_val_idx / len(best_vals), len(best_vals), 100. * self.truncation_percentage))
elif self.policy == "curve_fitting":
if len(finished_interm_job_res) < 1 or step <= CURVE_FITTING_MIN_ITS:
continue
interm_res = copy.deepcopy(c_interm_res)
best_val = best_fn([best_fn(vals) for vals in finished_interm_job_res.values()])
if not self.maximize:
interm_res *= -1
best_val *= -1
curve_fitting_threshold = self.curve_fitting_threshold if np.sign(best_val) == 1 else \
2 - self.curve_fitting_threshold
cf_thread = threading.Thread(target=self.run_curve_fitting,
args=(interm_res[:step], c_jid, step, (lambda x, target: x >= target), curve_fitting_threshold, best_val))
cf_thread.start()
curve_fitting_threads += [cf_thread]
self.job_checked[c_jid] += [step]
for thread in curve_fitting_threads:
thread.join()
time.sleep(EARLY_STOPPING_SLEEP)
[docs] def refresh(self):
'''
Method for refreshing timers/variables etc
'''
pass
[docs] def log_error_message(self, msg):
self.connector.log_error_message(self.eid, msg)