ftp exists and mkdir with parents methods
[qcg-portal.git] / qcg / service.py
index 6f44f49..44cb42e 100644 (file)
@@ -1,12 +1,16 @@
 import logging
 import logging
+import os
 import time
 
 from django.db import transaction
 import time
 
 from django.db import transaction
+from django.utils.functional import SimpleLazyObject
 from django.utils.timezone import now
 from pyqcg import QCG
 from pyqcg.description import JobDescription
 from pyqcg.service import Registry
 from django.utils.timezone import now
 from pyqcg import QCG
 from pyqcg.description import JobDescription
 from pyqcg.service import Registry
-from pyqcg.utils import Credential, TimePeriod
+from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
+
+from qcg.models import User, Job, Task, Allocation, NodeInfo
 
 
 logger = logging.getLogger(__name__)
 
 
 logger = logging.getLogger(__name__)
@@ -16,21 +20,21 @@ logger = logging.getLogger(__name__)
 def update_user_data(user, proxy):
     ts = time.time()
     QCG.start()
 def update_user_data(user, proxy):
     ts = time.time()
     QCG.start()
-    from qcg.models import User, Job, Task, Allocation, NodeInfo
 
 
-    credential = Credential(proxy)
-    registry = Registry(credential)
+    registry = Registry(Credential(proxy))
 
     # put lock on user record (hopefully..?)
     user = User.objects.select_for_update().get(pk=user.pk)
 
     # put lock on user record (hopefully..?)
     user = User.objects.select_for_update().get(pk=user.pk)
+    last_update = user.last_update
 
 
-    changed_filter = {'changed': TimePeriod(after=user.last_update)}
+    changed_filter = {'changed': TimePeriod(after=last_update)}
 
     ###################################
     # Jobs
     ###################################
     jts = time.time()
 
     ###################################
     # Jobs
     ###################################
     jts = time.time()
-    jobs = registry.jobs(**changed_filter)
+    jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
+                         **changed_filter)
     jte = time.time()
 
     jobs_count = 0
     jte = time.time()
 
     jobs_count = 0
@@ -45,13 +49,14 @@ def update_user_data(user, proxy):
     # Tasks
     ###################################
     tts = time.time()
     # Tasks
     ###################################
     tts = time.time()
-    tasks = registry.tasks(**changed_filter)
+    tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
+                           **changed_filter)
     tte = time.time()
 
     tte = time.time()
 
-    jobs_cache = {j.job_id: j for j in Job.objects.filter(owner=user)}
+    jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
     task_count = 0
     for qcg_task in tasks:
     task_count = 0
     for qcg_task in tasks:
-        params = Task.qcg_map(qcg_task, jobs_cache)
+        params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
         task_id = params.pop('task_id')
 
         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
         task_id = params.pop('task_id')
 
         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
@@ -75,19 +80,67 @@ def update_user_data(user, proxy):
     elapsed_jobs = jte - jts
     elapsed_tasks = tte - tts
     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
     elapsed_jobs = jte - jts
     elapsed_tasks = tte - tts
     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
-    logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
-                elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
+    logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
+                elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
 
 
 
 
-def submit_job(params, proxy):
-    # print params
+@transaction.atomic
+def update_job(job, proxy):
+    if job.terminated and job.purged:
+        return
 
 
+    ts = time.time()
+    QCG.start()
+
+    credential = Credential(proxy)
+    qcg_job = job.qcg_job
+    qcg_job.credential = credential
+
+    jts = time.time()
+    qcg_job.refresh_information()
+    elapsed_job = time.time() - jts
+
+    for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
+        if getattr(job, name) != val:
+            setattr(job, name, val)
+
+    job.save()
+
+    elapsed_tasks = 0
+    for task in job.tasks.all():
+        qcg_task = task.qcg_task
+        qcg_task.credential = credential
+
+        tts = time.time()
+        qcg_task.refresh_information()
+        elapsed_tasks += time.time() - tts
+
+        for name, val in task.qcg_map(qcg_task, task.job).iteritems():
+            if getattr(task, name) != val:
+                setattr(task, name, val)
+
+        task.save()
+        task.allocations.all().delete()
+
+        for qcg_alloc in qcg_task.allocations:
+            alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
+
+            for qcg_node in qcg_alloc.nodes:
+                alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_job - elapsed_tasks
+    logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
+                elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
+
+
+def submit_job(params, proxy):
     QCG.start()
     desc = JobDescription(Credential(proxy))
 
     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
     QCG.start()
     desc = JobDescription(Credential(proxy))
 
     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
-                  'wall_time', 'memory', 'memory_per_slot', 'modules', 'native', 'notify', 'preprocess', 'postprocess',
-                  'persistent')
+                  'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
+                  'preprocess', 'postprocess', 'persistent')
 
     for name in direct_map:
         if params[name]:
 
     for name in direct_map:
         if params[name]:
@@ -95,6 +148,8 @@ def submit_job(params, proxy):
 
     if params['application']:
         desc.set_application(*params['application'])
 
     if params['application']:
         desc.set_application(*params['application'])
+        desc.stage_in += [params['master_file']]
+        desc.arguments.insert(0, os.path.basename(params['master_file']))
     if params['nodes']:
         desc.set_nodes(*params['nodes'])
     if params['reservation']:
     if params['nodes']:
         desc.set_nodes(*params['nodes'])
     if params['reservation']:
@@ -102,10 +157,6 @@ def submit_job(params, proxy):
     if params['watch_output']:
         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
     # TODO script
     if params['watch_output']:
         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
     # TODO script
-    # TODO executable
-    # TODO input
-    # TODO stage_in
-    # TODO stage_out
     # TODO monitoring
 
     # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
     # TODO monitoring
 
     # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
@@ -113,6 +164,38 @@ def submit_job(params, proxy):
 
     # print desc.xml_description
 
 
     # print desc.xml_description
 
-    # job = desc.submit()
+    job = desc.submit()
+
+    return job.job_id
+
+
+def cancel(obj, proxy):
+    ts = time.time()
+    QCG.start()
+
+    qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+    qcg_obj.credential = Credential(proxy)
+
+    jts = time.time()
+    qcg_obj.cancel()
+    elapsed_cancel = time.time() - jts
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_cancel
+    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
 
 
-    # return job.job_id
+
+def clean(obj, proxy):
+    ts = time.time()
+    QCG.start()
+
+    qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+    qcg_obj.credential = Credential(proxy)
+
+    jts = time.time()
+    qcg_obj.clean()
+    elapsed_clean = time.time() - jts
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_clean
+    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)