"""
..
Copyright (c) 2018 LG Electronics Inc.
SPDX-License-Identifier: GPL-3.0-or-later
aup.ET.Connector.AbstractConnector
==================================
Define the basic interface between experiment tracking and executor engine.
Currently, SQLite is the only one implemented.
APIs
----
"""
import abc
import logging
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
logger = logging.getLogger(__name__)
[docs]class AbstractConnector(ABC):
# ################ General Part #################
[docs] @abc.abstractmethod
def close(self):
"""
Mark the connector as closed. A closed connector should not be used anymore.
"""
pass
[docs] @abc.abstractmethod
def is_closed(self):
"""
Return whether or not the connector is closed. A closed connector should not be used anymore.
"""
return False
# ################ Resource Related Part #################
[docs] @abc.abstractmethod
def free_used_resource(self, rid):
"""
Mark resource as free (opposite to :func:take_available_resource)
:param rid: Resource ID(s)
:type rid: int
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def get_resource_type(self):
"""
Get the resource type for a given user
:return: list of resource types
:rtype: list
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def get_available_resource(self, username, rtype):
"""
Get available resource for a user and resource type.
Currently there is no limitation/requirement for user
:param username: username
:type username: str
:param rtype: Resource type
:type rtype: str
:return: Resource Id
:rtype: list(int)
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def take_available_resource(self, rid):
"""
Mark resource as used
TODO - currently where is prevention for async update, might not be relevant in the near future
:param rid: Resource ID(s)
:type rid: int
:return: True/False
"""
raise NotImplementedError
# ################ Experiment Related Part ################
[docs] @abc.abstractmethod
def end_experiment(self, eid):
"""
Mark experiment as finished
:param eid: Experiment ID
:type eid: int
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def get_best_result(self, eid, maximize=True):
"""
Retrieve the best job id and score from the database for experiment eid
:param eid: Experiment ID
:type eid: int
:param maximize: whether to choose max or min
:type maximize: bool
:return: Job ID, score
:rtype: list(int, float)
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def start_experiment(self, username, name, exp_config_blob):
"""
Create an Experiment and track it
:param username: username
:type username: str
:param name: experiment name
:type name: str
:param exp_config_blob: configuration of experiment
:type exp_config_blob: str
:return: experiment ID
:rtype: int
"""
raise NotImplementedError
# ################ Job Related Part #######################
[docs] @abc.abstractmethod
def end_job(self, jid, score, status):
"""
Mark a job ended
:param jid: Job ID
:type jid: int
:param score: score of the Job
:type score: str
:param status: status of the Job
:type status: enumeration
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def get_all_experiment(self, username=None):
"""
Get all Experiment IDs
:param username: to get experiments for a specific user
:type username: str
:return: Experiment IDs
:rtype: list(int)
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def get_all_history(self, eid):
"""
Get full history of an Experiment
:param eid: Experiment ID
:type eid: int
:return:
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def get_running_job(self, eid):
"""
Get running Job IDs
:param eid: Experiment ID
:type eid: int
:return: list of Job IDs
:rtype: list(int)
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def start_job(self, eid, rid, job_config):
"""
Start a job with job configuration and track it
:param eid: Experiment ID
:type eid: int
:param rid: Resource ID
:type rid: int
:param job_config: Configuration for :class:`aup.EE.Job.Job`
:type job_config: BasicConfig
:return: Job ID (jid)
:rtype: int
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def start_job_attempt(self, rid, jid):
"""
Starts a job attempt for a given job, using the given resource
:param rid: Resource ID
:type rid: int
:param jid: Job ID
:type jid: int
:return: Job Attempt ID (jaid)
:rtype: int
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def end_job_attempt(self, jid):
"""
Ends a job attempt, but not a job (leaving room for retries)
:param id: Job ID
:type id: int
"""
raise NotImplementedError
[docs] def update_job_status(self, jid, status):
"""
Change a job's status
:param jid: Job ID
:type jid: int
:param status: new status
:type status: enumeration
"""
raise NotImplementedError
# ################ Job interface for Experiment ##############
[docs] def job_finished(self, rid, jid, score=None, status=None):
"""
Clean up Job when it is finished
:param rid: Resource ID to be free
:type rid: int
:param jid: Job ID
:type jid: int
:param score: score returned by Job (error case will be ERROR)
:type score: float / str
:param status: job status
:type status: enumeration
"""
logger.debug("Job %d is finished on %d, score is %s" % (jid, rid, score))
self.free_used_resource(rid)
self.end_job_attempt(jid)
self.end_job(jid, score, status)
[docs] def job_started(self, eid, rid, job_config):
"""
Interface to automatically take resource and run job.
:param eid: Experiment ID
:type eid: int
:param rid: Resource ID
:type rid: int
:param job_config: Configuration for Job
:type job_config: BasicConfig
:return: Job ID
:rtype: int
"""
self.take_available_resource(rid)
return self.start_job(eid, rid, job_config)
[docs] def job_failed(self, rid, jid):
"""
Interface to take care of job failure in case of possible retries
:param rid: Resource ID
:type rid: int
:param jid: Job ID
:type jid: int
"""
self.free_used_resource(rid)
self.end_job_attempt(jid)
[docs] def job_retry(self, rid, jid):
"""
Interface to mark the beginning of a job retry
:param rid: Resource ID
:type rid: int
:param jid: Job ID
:type jid: int
"""
self.take_available_resource(rid)
return self.start_job_attempt(rid, jid)
[docs] def create_experiment(self, username, name, exp_config_blob):
"""
Interface for creating an experiment without starting it
"""
raise NotImplementedError
[docs] def delete_experiment(self, eid):
"""
Interface for deleting an experiment by eid
"""
raise NotImplementedError
[docs] def get_experiment_status(self, eid):
"""
Interface for getting the database status of an experiment
"""
raise NotImplementedError
[docs] def start_experiment_by_eid(self, eid):
"""
Interface for (re)starting an experiment by eid
"""
raise NotImplementedError
[docs] def log_error_message(self, eid, msg):
"""
Log in database the error message
"""
raise NotImplementedError
[docs] def save_multiple_results(self, jid, irid, eid, labels, scores):
"""
Save in database other results desired by user
"""
raise NotImplementedError
[docs] def set_last_multiple_results(self, eid, jid, num_labels):
"""
Set the 'is_last_result' flag to true for this jid
"""
raise NotImplementedError