Source code for aup.EE.Job

"""
..
  Copyright (c) 2018 LG Electronics Inc.
  SPDX-License-Identifier: GPL-3.0-or-later

aup.EE.Job module
=================

Wrap Job information for :mod:`aup.EE.Resource` to execute.

APIs
----
"""
import logging
import os
import stat
from time import sleep

from ..utils import open_sftp_with_timeout

try:
    FileNotFoundError
except NameError:
    FileNotFoundError = IOError

logger = logging.getLogger(__name__)


[docs]class Job: local_warning = True def __init__(self, script, config, path=".", retries=3): """ Job wrapper give to resource manager to execute :param script: script file name :type script: str :param config: job configuration :type config: aup.BasicConfig :param path: working path :type path: str :param retries: max number of retries for one job in the case of job failure :type retries: int """ logger.debug("Input: %s,%s", script, path) self.config = config self.path = os.path.abspath(os.path.expanduser(path)) if script[0] not in ('.', '/'): script = os.path.join(".", script) if Job.local_warning: logger.warning("Using default local path for script as %s", script) Job.local_warning = False # only warning once self.script = script self.jid = 0 self.was_executed = False self.retries = retries self.curr_retries = 0 self.rid_blacklist = set() # if true, it means this is the best job self.save_model_flag = config.get('save_model', False)
[docs] def verify_local(self): """ Verify the job script is correctly set - for local machine running only :return: whether all paths are set correctly :rtype: bool """ job_path = os.path.join(self.path, "jobs") script = os.path.join(self.path, self.script.split()[0]) # avoid additional arguments in the script if not os.path.isdir(self.path): msg = "Working folder %s does not exist" % self.path logger.fatal(msg) raise EnvironmentError(msg) if not os.path.exists(job_path): # job config file dir, also used in ResourceManagers. logger.warning("Create missing directory %s", job_path) os.makedirs(job_path) logger.debug("Create Job %s", script) if not os.path.isfile(script): logger.fatal("Job script %s is not exist", self.script) raise EnvironmentError("Missing script") if not os.access(script, os.X_OK): logger.fatal("Job script is not executable, try `chmod u+x %s`"%self.script) raise EnvironmentError("Wrong permission for %s" % self.script) return True
[docs] def verify_remote(self, remote, overwrite=False): # pragma: no cover """ Verify the files and folder are correct on the remote machine :param remote: paramiko.SSHClient :param overwrite: whether to overwrite existing script file and folder :return: whether the files are set correctly :rtype: bool """ sftp = open_sftp_with_timeout(remote.client, 5) local_script = self.script.split()[0] local_job_path = os.path.join(os.path.dirname(local_script), 'jobs') if not os.path.isdir(local_job_path): logger.warning("Create missing local job config directory %s", local_job_path) os.makedirs(local_job_path) remote_script = os.path.join(self.path, os.path.basename(self.script)) try: sftp.stat(self.path) except FileNotFoundError: logger.warning("Create missing folder %s", self.path) sftp.mkdir(self.path) try: path = os.path.join(self.path, "jobs") stats = sftp.stat(path) if not (stats.st_mode & stat.S_IWRITE): logger.warning("%s/jobs is not writable, changed", path) sftp.chmod(path, stat.S_IWRITE) except FileNotFoundError: logger.warning("create jobs %s", os.path.join(self.path, "jobs")) sftp.mkdir(os.path.join(self.path, "jobs")) if overwrite: logger.info("Overwrite script %s", remote_script) sftp.put(local_script, remote_script) sftp.chmod(remote_script, stat.S_IRWXU) try: sftp.stat(remote_script) except FileNotFoundError: logger.warning("script not found, copy local file over") sftp.put(local_script, remote_script) sftp.chmod(remote_script, stat.S_IRWXU) stats = sftp.stat(remote_script) if not (stats.st_mode & stat.S_IEXEC): logger.warning("Assign correct permission") sftp.chmod(remote_script, stat.S_IRWXU) sftp.close() return True