X-Git-Url: http://mmka.chem.univ.gda.pl/gitweb/?a=blobdiff_plain;f=qcg%2Fservice.py;h=9e99d43bee72267688c778dbd0f8dc84e1a8acfa;hb=1073f7f5b5a9e51fd79ab58f1f4fb3389d7e6331;hp=665863898a6d43f7a8ae0d5b1175d77bdaabb4da;hpb=eb0c9af5b8eb444d8db3e8759d3a83e8c6d41ee1;p=qcg-portal.git diff --git a/qcg/service.py b/qcg/service.py index 6658638..9e99d43 100644 --- a/qcg/service.py +++ b/qcg/service.py @@ -2,12 +2,14 @@ import logging import time from django.db import transaction +from django.utils.functional import SimpleLazyObject from django.utils.timezone import now from pyqcg import QCG -from pyqcg.description import JobDescription from pyqcg.service import Registry from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus +from qcg.models import User, Job, Task, Allocation, NodeInfo + logger = logging.getLogger(__name__) @@ -16,15 +18,14 @@ logger = logging.getLogger(__name__) def update_user_data(user, proxy): ts = time.time() QCG.start() - from qcg.models import User, Job, Task, Allocation, NodeInfo - credential = Credential(proxy) - registry = Registry(credential) + registry = Registry(Credential(proxy)) # put lock on user record (hopefully..?) user = User.objects.select_for_update().get(pk=user.pk) + last_update = user.last_update - changed_filter = {'changed': TimePeriod(after=user.last_update)} + changed_filter = {'changed': TimePeriod(after=last_update)} ################################### # Jobs @@ -50,10 +51,10 @@ def update_user_data(user, proxy): **changed_filter) tte = time.time() - jobs_cache = {j.job_id: j for j in Job.objects.filter(owner=user)} + jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)}) task_count = 0 for qcg_task in tasks: - params = Task.qcg_map(qcg_task, jobs_cache) + params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id]) task_id = params.pop('task_id') task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params) @@ -77,41 +78,100 @@ def update_user_data(user, proxy): elapsed_jobs = jte - jts elapsed_tasks = tte - tts elapsed_py = elapsed - elapsed_jobs - elapsed_tasks - logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f', - elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py) + logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f', + elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py) + + +@transaction.atomic +def update_job(job, proxy): + ts = time.time() + QCG.start() + + credential = Credential(proxy) + qcg_job = job.qcg_job + qcg_job.credential = credential + + jts = time.time() + qcg_job.refresh_information() + elapsed_job = time.time() - jts + for name, val in job.qcg_map(qcg_job, job.owner).iteritems(): + if getattr(job, name) != val: + setattr(job, name, val) -def submit_job(params, proxy): - # print params + job.save() + + elapsed_tasks = 0 + for task in job.tasks.all(): + qcg_task = task.qcg_task + qcg_task.credential = credential + + tts = time.time() + qcg_task.refresh_information() + elapsed_tasks += time.time() - tts + + for name, val in task.qcg_map(qcg_task, task.job).iteritems(): + if getattr(task, name) != val: + setattr(task, name, val) + + task.save() + task.allocations.all().delete() + + for qcg_alloc in qcg_task.allocations: + alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc)) + + for qcg_node in qcg_alloc.nodes: + alloc.nodes.create(**NodeInfo.qcg_map(qcg_node)) + elapsed = time.time() - ts + elapsed_py = elapsed - elapsed_job - elapsed_tasks + logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f', + elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py) + + +def submit(obj, proxy): + ts = time.time() + QCG.start() + cred = Credential(proxy) + + jts = time.time() + result = obj.submit(cred) + elapsed_submit = time.time() - jts + + elapsed = time.time() - ts + elapsed_py = elapsed - elapsed_submit + logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py) + + return result + + +def cancel(obj, proxy): + ts = time.time() QCG.start() - desc = JobDescription(Credential(proxy)) - direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs', - 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify', - 'preprocess', 'postprocess', 'persistent') + qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task + qcg_obj.credential = Credential(proxy) + + jts = time.time() + qcg_obj.cancel() + elapsed_cancel = time.time() - jts - for name in direct_map: - if params[name]: - setattr(desc, name, params[name]) + elapsed = time.time() - ts + elapsed_py = elapsed - elapsed_cancel + logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py) - if params['application']: - desc.set_application(*params['application']) - if params['nodes']: - desc.set_nodes(*params['nodes']) - if params['reservation']: - desc.set_reservation(params['reservation']) - if params['watch_output']: - desc.set_watch_output(params['watch_output'], params['watch_output_pattern']) - # TODO script - # TODO stage_out - # TODO monitoring - # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'): - # print prop, type(getattr(desc, prop)), repr(getattr(desc, prop)) +def clean(obj, proxy): + ts = time.time() + QCG.start() - # print desc.xml_description + qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task + qcg_obj.credential = Credential(proxy) - # job = desc.submit() + jts = time.time() + qcg_obj.clean() + elapsed_clean = time.time() - jts - # return job.job_id + elapsed = time.time() - ts + elapsed_py = elapsed - elapsed_clean + logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)