job templates: populating submit form with template's attributes
[qcg-portal.git] / qcg / service.py
index 6f44f49..c69eee0 100644 (file)
@@ -2,11 +2,13 @@ import logging
 import time
 
 from django.db import transaction
+from django.utils.functional import SimpleLazyObject
 from django.utils.timezone import now
 from pyqcg import QCG
-from pyqcg.description import JobDescription
 from pyqcg.service import Registry
-from pyqcg.utils import Credential, TimePeriod
+from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
+
+from qcg.models import User, Job, Task, Allocation, NodeInfo
 
 
 logger = logging.getLogger(__name__)
@@ -16,21 +18,21 @@ logger = logging.getLogger(__name__)
 def update_user_data(user, proxy):
     ts = time.time()
     QCG.start()
-    from qcg.models import User, Job, Task, Allocation, NodeInfo
 
-    credential = Credential(proxy)
-    registry = Registry(credential)
+    registry = Registry(Credential(proxy))
 
     # put lock on user record (hopefully..?)
     user = User.objects.select_for_update().get(pk=user.pk)
+    last_update = user.last_update
 
-    changed_filter = {'changed': TimePeriod(after=user.last_update)}
+    changed_filter = {'changed': TimePeriod(after=last_update)}
 
     ###################################
     # Jobs
     ###################################
     jts = time.time()
-    jobs = registry.jobs(**changed_filter)
+    jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
+                         **changed_filter)
     jte = time.time()
 
     jobs_count = 0
@@ -45,13 +47,14 @@ def update_user_data(user, proxy):
     # Tasks
     ###################################
     tts = time.time()
-    tasks = registry.tasks(**changed_filter)
+    tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
+                           **changed_filter)
     tte = time.time()
 
-    jobs_cache = {j.job_id: j for j in Job.objects.filter(owner=user)}
+    jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
     task_count = 0
     for qcg_task in tasks:
-        params = Task.qcg_map(qcg_task, jobs_cache)
+        params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
         task_id = params.pop('task_id')
 
         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
@@ -75,44 +78,87 @@ def update_user_data(user, proxy):
     elapsed_jobs = jte - jts
     elapsed_tasks = tte - tts
     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
-    logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
-                elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
+    logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
+                elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
+
+
+@transaction.atomic
+def update_job(job, proxy):
+    if job.terminated and job.purged:
+        return
+
+    ts = time.time()
+    QCG.start()
+
+    credential = Credential(proxy)
+    qcg_job = job.qcg_job
+    qcg_job.credential = credential
+
+    jts = time.time()
+    qcg_job.refresh_information()
+    elapsed_job = time.time() - jts
+
+    for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
+        if getattr(job, name) != val:
+            setattr(job, name, val)
+
+    job.save()
+
+    elapsed_tasks = 0
+    for task in job.tasks.all():
+        qcg_task = task.qcg_task
+        qcg_task.credential = credential
+
+        tts = time.time()
+        qcg_task.refresh_information()
+        elapsed_tasks += time.time() - tts
+
+        for name, val in task.qcg_map(qcg_task, task.job).iteritems():
+            if getattr(task, name) != val:
+                setattr(task, name, val)
+
+        task.save()
+        task.allocations.all().delete()
+
+        for qcg_alloc in qcg_task.allocations:
+            alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
+
+            for qcg_node in qcg_alloc.nodes:
+                alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_job - elapsed_tasks
+    logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
+                elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
+
+
+def cancel(obj, proxy):
+    ts = time.time()
+    QCG.start()
 
+    qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+    qcg_obj.credential = Credential(proxy)
 
-def submit_job(params, proxy):
-    # print params
+    jts = time.time()
+    qcg_obj.cancel()
+    elapsed_cancel = time.time() - jts
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_cancel
+    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
 
+
+def clean(obj, proxy):
+    ts = time.time()
     QCG.start()
-    desc = JobDescription(Credential(proxy))
-
-    direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
-                  'wall_time', 'memory', 'memory_per_slot', 'modules', 'native', 'notify', 'preprocess', 'postprocess',
-                  'persistent')
-
-    for name in direct_map:
-        if params[name]:
-            setattr(desc, name, params[name])
-
-    if params['application']:
-        desc.set_application(*params['application'])
-    if params['nodes']:
-        desc.set_nodes(*params['nodes'])
-    if params['reservation']:
-        desc.set_reservation(params['reservation'])
-    if params['watch_output']:
-        desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
-    # TODO script
-    # TODO executable
-    # TODO input
-    # TODO stage_in
-    # TODO stage_out
-    # TODO monitoring
-
-    # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
-    #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
-
-    # print desc.xml_description
-
-    # job = desc.submit()
-
-    # return job.job_id
+
+    qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+    qcg_obj.credential = Credential(proxy)
+
+    jts = time.time()
+    qcg_obj.clean()
+    elapsed_clean = time.time() - jts
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_clean
+    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)