X-Git-Url: http://mmka.chem.univ.gda.pl/gitweb/?a=blobdiff_plain;f=qcg%2Fservice.py;h=9e99d43bee72267688c778dbd0f8dc84e1a8acfa;hb=1073f7f5b5a9e51fd79ab58f1f4fb3389d7e6331;hp=15d939c9a95906b15f155101e5f4909c75fbfdf6;hpb=a48622a1086a47a1abc26dcd3d1d05a384eb1e8f;p=qcg-portal.git diff --git a/qcg/service.py b/qcg/service.py index 15d939c..9e99d43 100644 --- a/qcg/service.py +++ b/qcg/service.py @@ -1,19 +1,14 @@ import logging -import os import time from django.db import transaction from django.utils.functional import SimpleLazyObject from django.utils.timezone import now from pyqcg import QCG -from pyqcg.description import JobDescription from pyqcg.service import Registry from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus -from filex.ftp import FTPOperation -from qcg.constants import QCG_DATA_URL from qcg.models import User, Job, Task, Allocation, NodeInfo -from qcg.utils import random_id, chunks logger = logging.getLogger(__name__) @@ -89,9 +84,6 @@ def update_user_data(user, proxy): @transaction.atomic def update_job(job, proxy): - if job.terminated and job.purged: - return - ts = time.time() QCG.start() @@ -137,44 +129,20 @@ def update_job(job, proxy): elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py) -def make_job_desc(params, proxy): +def submit(obj, proxy): + ts = time.time() QCG.start() - desc = JobDescription(Credential(proxy)) - - direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs', - 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify', - 'preprocess', 'postprocess', 'persistent') - - for name in direct_map: - if params[name]: - setattr(desc, name, params[name]) + cred = Credential(proxy) - if params['application']: - desc.set_application(*params['application']) - desc.stage_in += [params['master_file']] - desc.arguments.insert(0, os.path.basename(params['master_file'])) - if params['script']: - ftp = FTPOperation(proxy) - - ftp.mkdir(QCG_DATA_URL, parents=True) - url = os.path.join(QCG_DATA_URL, 'script.{}.sh'.format(random_id())) - ftp.put(url) - - for chunk in chunks(params['script'], 4096): - ftp.stream.put(chunk) - ftp.stream.put(None) + jts = time.time() + result = obj.submit(cred) + elapsed_submit = time.time() - jts - ftp.wait() - desc.executable = url - if params['nodes']: - desc.set_nodes(*params['nodes']) - if params['reservation']: - desc.set_reservation(params['reservation']) - if params['watch_output']: - desc.set_watch_output(params['watch_output'], params['watch_output_pattern']) - # TODO monitoring + elapsed = time.time() - ts + elapsed_py = elapsed - elapsed_submit + logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py) - return desc + return result def cancel(obj, proxy): @@ -190,7 +158,7 @@ def cancel(obj, proxy): elapsed = time.time() - ts elapsed_py = elapsed - elapsed_cancel - logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py) + logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py) def clean(obj, proxy): @@ -206,4 +174,4 @@ def clean(obj, proxy): elapsed = time.time() - ts elapsed_py = elapsed - elapsed_clean - logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py) + logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)