"""
..
Copyright (c) 2018 LG Electronics Inc.
SPDX-License-Identifier: GPL-3.0-or-later
aup.ET.Connector.SQLiteConnector
================================
If encounter "Failed to query SQLite after xx times" error,
increase DELAY_INTERVAL and REPEATED_TIME to prevent problem temporarily.
APIs
----
"""
import logging
import re
import sqlite3
import threading
from time import sleep
import json
from datetime import datetime
from .AbstractConnector import AbstractConnector
logger = logging.getLogger(__name__)
DELAY_INTERVAL = 0.1
REPEATED_TIME = 5
LOCK = threading.Lock()
def _delayed(func):
def wrapper(*args, **kwargs):
flag = 0
while flag < REPEATED_TIME:
try:
LOCK.acquire() # make sure not to call @_delayed functions recursively from one another
results = func(*args, **kwargs)
return results
except sqlite3.ProgrammingError as ex: # pragma: no cover
logger.critical("update is too frequent, delayed for 1 sec")
logger.debug("sqlite3 programming error: {}".format(ex))
sleep(DELAY_INTERVAL)
flag += 1
finally:
LOCK.release()
raise Exception("Failed to query SQLite after %d times" % flag) # pragma: no cover
return wrapper
[docs]class SQLiteConnector(AbstractConnector):
def __init__(self, filename):
super(SQLiteConnector, self).__init__()
self.connector = sqlite3.connect(filename, check_same_thread=False)
self.cursor = self.connector.cursor()
self.cursor.execute("PRAGMA FOREIGN_KEYS = ON;")
self.closed = False
def _fix_name(self, name):
self.cursor.execute("SELECT name FROM experiment WHERE name = ?", (name,))
names = [i[0] for i in self.cursor.fetchall()]
if not names:
return name
self.cursor.execute("SELECT name FROM experiment WHERE name LIKE ?", ("{} (%)".format(name),))
last_index = 0
names = [i[0] for i in self.cursor.fetchall()]
if names:
names = [name for name in names if re.findall(r"\(\d+\)$", name)]
if names:
indexes = [int(re.findall(r"\(\d+\)$", name)[-1][1:-1]) for name in names]
last_index = max(indexes)
return name + " ({})".format(last_index + 1)
def _mark_resource(self, rid, status):
if not isinstance(rid, int):
raise ValueError("Resource ID is not integer, %s"%type(rid))
self.cursor.execute("UPDATE resource SET status=? WHERE rid=?", (status, rid))
self.connector.commit()
# other utils
[docs] @_delayed
def free_all_resources(self):
self.cursor.execute("UPDATE resource SET status='free'")
self.connector.commit()
[docs] @_delayed
def close(self):
self.connector.commit()
self.connector.close()
self.closed = True
[docs] @_delayed
def is_closed(self):
return self.closed
[docs] @_delayed
def end_experiment(self, eid, status="FINISHED"):
self.cursor.execute("UPDATE experiment SET end_time=strftime('%s','now'), status=? WHERE eid=?", (status, eid))
self.connector.commit()
[docs] @_delayed
def change_experiment_status(self, eid, status="FINISHED"):
self.cursor.execute("UPDATE experiment SET status=? WHERE eid=?", (status, eid))
self.connector.commit()
[docs] @_delayed
def end_job(self, jid, score=None, status=None):
self.cursor.execute("UPDATE job SET end_time=strftime('%s','now'), status=?, score=? WHERE jid=?", (status, score, jid))
self.connector.commit()
[docs] @_delayed
def end_job_attempt(self, jid):
self.cursor.execute("""UPDATE job_attempt SET end_time=strftime('%s', 'now') WHERE jaid=(
SELECT jaid FROM job_attempt jt WHERE jt.jid=? ORDER BY num DESC LIMIT 1)""", (jid,))
self.connector.commit()
[docs] @_delayed
def free_used_resource(self, rid):
self._mark_resource(rid, 'free')
return True
[docs] @_delayed
def get_all_experiment(self, username=None):
if username:
self.cursor.execute("SELECT uid FROM user WHERE name = ?", (username,))
uid = self.cursor.fetchone()
if uid is None:
raise ValueError("User %s does not exist" % username)
self.cursor.execute("SELECT eid FROM experiment WHERE uid = ?", (uid[0],))
else:
self.cursor.execute("SELECT eid FROM experiment")
eids = self.cursor.fetchall()
return [e[0] for e in eids] # unzip tuple of one element
[docs] @_delayed
def get_available_resource(self, username, rtype, rid_blacklist=None):
rids = []
if rid_blacklist:
# Initial approach:
# self.cursor.execute("SELECT rid FROM resource WHERE status = 'free' AND type = ? AND rid NOT IN ?;", (rtype, rid_blacklist))
# However, this does not work with sqlite3, so the following approach was ultimately selected
self.cursor.execute("SELECT rid FROM resource WHERE status = 'free' AND type = ? AND rid NOT IN ({});".format(
",".join("?" * len(rid_blacklist))), (rtype,) + tuple(rid_blacklist))
rids = [i[0] for i in self.cursor.fetchall()]
if not rids:
# An attempt was made to filter out resources, but this was not possible (the only
# available resource are the ones blacklisted)
self.cursor.execute("SELECT rid FROM resource WHERE status = 'free' AND instr(?, type) != 0;", (rtype,))
rids = [i[0] for i in self.cursor.fetchall()]
return rids
[docs] @_delayed
def get_all_history(self, eid):
self.cursor.execute("SELECT * FROM job WHERE eid = ?", (eid,))
return self.cursor.fetchall()
[docs] @_delayed
def get_best_result(self, eid, maximize=True):
if maximize:
self.cursor.execute("""SELECT jid, score
FROM job WHERE eid = ? AND score=(select max(score) from job where eid=? AND typeof(score) = 'real')""", (eid, eid))
else:
self.cursor.execute("""SELECT jid, score
FROM job WHERE eid = ? AND score=(select min(score) from job where eid=? AND typeof(score) = 'real')""", (eid, eid))
t = self.cursor.fetchone()
if t is None:
self.cursor.execute("""select * from experiment where eid=?""", (eid,))
t = self.cursor.fetchone()
if t is None:
raise KeyError("Experiment ID %d not exist" % eid)
else:
return None
return list(t)
[docs] @_delayed
def get_best_result_config(self, eid, maximize=True):
if maximize:
self.cursor.execute("""SELECT job_config
FROM job WHERE eid = ? AND score=(select max(score) from job where eid=? AND typeof(score) = 'real')""", (eid, eid))
else:
self.cursor.execute("""SELECT job_config
FROM job WHERE eid = ? AND score=(select min(score) from job where eid=? AND typeof(score) = 'real')""", (eid, eid))
t = self.cursor.fetchone()
return t
[docs] @_delayed
def get_running_job(self, eid):
self.cursor.execute("SELECT jid FROM job WHERE eid = ?", (eid,))
jid = [i[0] for i in self.cursor.fetchall()]
logger.debug("%s" % jid)
return jid
[docs] @_delayed
def get_resource_type(self):
self.cursor.execute("SELECT DISTINCT type from resource;")
return [i[0] for i in self.cursor.fetchall()]
[docs] @_delayed
def start_experiment(self, username, name, exp_config_blob):
self.cursor.execute("SELECT uid FROM user WHERE name = ?", (username,))
uid = self.cursor.fetchone()
if uid is None:
raise ValueError("username %s is not existed" % username)
uid = uid[0]
name = self._fix_name(name)
self.cursor.execute("INSERT INTO experiment (uid, name, exp_config, start_time, error_msg, status) \
VALUES (?,?,?, strftime('%s','now'), NULL, 'RUNNING')",
(uid, name, exp_config_blob))
self.connector.commit()
return self.cursor.lastrowid
[docs] @_delayed
def start_experiment_by_eid(self, eid):
self.cursor.execute("DELETE FROM multiple_result WHERE eid={eid}".format(eid=eid))
self.connector.commit()
self.cursor.execute("DELETE FROM job_attempt WHERE jid in (SELECT jid FROM job WHERE eid={eid})".format(eid=eid))
self.connector.commit()
self.cursor.execute("DELETE FROM intermediate_result WHERE jid in (SELECT jid FROM job WHERE eid={eid})".format(eid=eid))
self.connector.commit()
self.cursor.execute("DELETE FROM job WHERE eid={eid}".format(eid=eid))
self.connector.commit()
self.cursor.execute("UPDATE experiment SET start_time = strftime('%s','now'), end_time = NULL, \
error_msg = NULL, status = 'RUNNING' WHERE eid={eid}".format(eid=eid))
self.connector.commit()
[docs] @_delayed
def start_job(self, eid, rid, job_config):
self.cursor.execute("INSERT INTO job (eid, start_time, job_config, status) VALUES (?, strftime('%s','now'), ?, 'RUNNING')",
(eid, json.dumps(job_config)))
self.connector.commit()
jid = self.cursor.lastrowid
self.cursor.execute("INSERT INTO job_attempt (jid, num, rid, start_time) VALUES (?, 0, ?, (SELECT start_time FROM job j WHERE j.jid=?))",
(jid, rid, jid))
self.connector.commit()
return jid
[docs] @_delayed
def start_job_attempt(self, rid, jid):
self.cursor.execute("INSERT INTO job_attempt (jid, num, rid, start_time) \
VALUES (?, (SELECT num FROM job_attempt jt WHERE jt.jid=? ORDER BY num DESC LIMIT 1)+1, ?, strftime('%s', 'now'))",
(jid, jid, rid))
self.connector.commit()
return self.cursor.lastrowid
[docs] @_delayed
def update_job_status(self, jid, status):
self.cursor.execute("UPDATE job SET status=? WHERE jid=?", (status, jid))
self.connector.commit()
[docs] @_delayed
def take_available_resource(self, rid):
self._mark_resource(rid, 'busy')
return True
[docs] @_delayed
def create_experiment(self, username, name, exp_config_blob):
self.cursor.execute("SELECT uid FROM user WHERE name = ?", (username,))
uid = self.cursor.fetchone()
if uid is None:
raise ValueError("username %s is not existed" % username)
uid = uid[0]
name = self._fix_name(name)
self.cursor.execute("INSERT INTO experiment (uid, name, exp_config, start_time, end_time, error_msg, status) \
VALUES (?,?,?, NULL, NULL, NULL, 'CREATED')",
(uid, name, exp_config_blob))
self.connector.commit()
return self.cursor.lastrowid
[docs] @_delayed
def delete_experiment(self, eid):
self.cursor.execute("SELECT eid FROM experiment WHERE eid = ?", (eid,))
check_eid = self.cursor.fetchone()
if check_eid is None:
return False
self.cursor.execute("DELETE FROM multiple_result WHERE eid={eid};".format(eid=eid))
self.cursor.execute("DELETE FROM job_attempt WHERE jid in (SELECT jid FROM job WHERE eid={eid})".format(eid=eid))
self.cursor.execute("DELETE FROM intermediate_result WHERE jid in (SELECT jid FROM job WHERE eid={eid})".format(eid=eid))
self.cursor.execute("DELETE FROM job WHERE eid={eid}".format(eid=eid))
self.cursor.execute("DELETE from experiment WHERE eid={eid}".format(eid=eid))
self.connector.commit()
return True
[docs] @_delayed
def get_experiment_status(self, eid):
self.cursor.execute("SELECT status FROM experiment WHERE eid = ? LIMIT 1", (eid,))
status = self.cursor.fetchone()
if status is None:
raise ValueError("Requested experiment for get_experiment_status not found in database!")
return status[0]
[docs] @_delayed
def maybe_get_experiment_status(self, eid):
# TODO: we need a way to recursively call these functions without locking issues (due to @_delayed)
if self.closed:
return None
self.cursor.execute("SELECT status FROM experiment WHERE eid = ? LIMIT 1", (eid,))
status = self.cursor.fetchone()
if status is None:
raise ValueError("Requested experiment for get_experiment_status not found in database!")
return status[0]
[docs] @_delayed
def log_error_message(self, eid, msg):
self.cursor.execute("UPDATE experiment SET error_msg=? WHERE eid=? and error_msg is NULL", (msg, eid))
self.connector.commit()
[docs] @_delayed
def save_multiple_results(self, jid, irid, eid, labels, scores):
receive_time = int(datetime.now().timestamp())
self.cursor.execute("""
SELECT count(*) FROM sqlite_master WHERE type='table' AND name='multiple_result'""")
res = self.cursor.fetchone()[0]
if res == 0:
logger.warning("multiple_result table not found, continuing without saving multiple results! \n \
Please consider updating Auptimizer to >=1.5")
return
for idx in range(len(scores)):
self.cursor.execute("""
INSERT INTO multiple_result (label_order, value, receive_time, jid, irid, eid, is_last_result)
VALUES (?, ?, ?, ?, ?, ?, 0)""",
(idx+1, scores[idx], receive_time, jid, irid, eid))
self.connector.commit()
[docs] @_delayed
def set_last_multiple_results(self, eid, jid, num_labels):
self.cursor.execute("""
SELECT mrid FROM multiple_result WHERE eid=? AND jid=?
ORDER BY mrid DESC LIMIT 0,?""", (eid, jid, num_labels))
res = self.cursor.fetchall()
for mrid in res:
self.cursor.execute("""
UPDATE multiple_result SET is_last_result=1 WHERE mrid=?""", (mrid[0],))
self.connector.commit()