Merge branch 'devel'
[qcg-portal.git] / qcg / service.py
index 44cb42e..99fdf36 100644 (file)
@@ -1,17 +1,16 @@
 import logging
 import logging
-import os
 import time
 
 from django.db import transaction
 from django.utils.functional import SimpleLazyObject
 from django.utils.timezone import now
 from pyqcg import QCG
 import time
 
 from django.db import transaction
 from django.utils.functional import SimpleLazyObject
 from django.utils.timezone import now
 from pyqcg import QCG
-from pyqcg.description import JobDescription
-from pyqcg.service import Registry
+from pyqcg.service import Registry, JobFactory
 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
 
 from qcg.models import User, Job, Task, Allocation, NodeInfo
 
 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
 
 from qcg.models import User, Job, Task, Allocation, NodeInfo
 
+__all__ = ['update_user_data', 'update_job', 'submit', 'cancel', 'clean', 'fetch_resources']
 
 logger = logging.getLogger(__name__)
 
 
 logger = logging.getLogger(__name__)
 
@@ -86,9 +85,6 @@ def update_user_data(user, proxy):
 
 @transaction.atomic
 def update_job(job, proxy):
 
 @transaction.atomic
 def update_job(job, proxy):
-    if job.terminated and job.purged:
-        return
-
     ts = time.time()
     QCG.start()
 
     ts = time.time()
     QCG.start()
 
@@ -134,39 +130,20 @@ def update_job(job, proxy):
                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
 
 
                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
 
 
-def submit_job(params, proxy):
+def submit(obj, proxy):
+    ts = time.time()
     QCG.start()
     QCG.start()
-    desc = JobDescription(Credential(proxy))
-
-    direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
-                  'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
-                  'preprocess', 'postprocess', 'persistent')
-
-    for name in direct_map:
-        if params[name]:
-            setattr(desc, name, params[name])
-
-    if params['application']:
-        desc.set_application(*params['application'])
-        desc.stage_in += [params['master_file']]
-        desc.arguments.insert(0, os.path.basename(params['master_file']))
-    if params['nodes']:
-        desc.set_nodes(*params['nodes'])
-    if params['reservation']:
-        desc.set_reservation(params['reservation'])
-    if params['watch_output']:
-        desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
-    # TODO script
-    # TODO monitoring
+    cred = Credential(proxy)
 
 
-    # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
-    #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
-
-    # print desc.xml_description
+    jts = time.time()
+    result = obj.submit(cred)
+    elapsed_submit = time.time() - jts
 
 
-    job = desc.submit()
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_submit
+    logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py)
 
 
-    return job.job_id
+    return result
 
 
 def cancel(obj, proxy):
 
 
 def cancel(obj, proxy):
@@ -182,7 +159,7 @@ def cancel(obj, proxy):
 
     elapsed = time.time() - ts
     elapsed_py = elapsed - elapsed_cancel
 
     elapsed = time.time() - ts
     elapsed_py = elapsed - elapsed_cancel
-    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
+    logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
 
 
 def clean(obj, proxy):
 
 
 def clean(obj, proxy):
@@ -198,4 +175,27 @@ def clean(obj, proxy):
 
     elapsed = time.time() - ts
     elapsed_py = elapsed - elapsed_clean
 
     elapsed = time.time() - ts
     elapsed_py = elapsed - elapsed_clean
-    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
+    logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
+
+
+def fetch_resources(proxy):
+    ts = time.time()
+    QCG.start()
+    cred = Credential(proxy)
+
+    rts = time.time()
+    resources = list(JobFactory().resources(False, cred))
+    elapsed_query = time.time() - rts
+
+    pts = time.time()
+    hosts = [res.name for res in resources]
+    storage = [res.storage for res in resources]
+    applications = {m for res in resources for m in res.applications}
+    modules = {m for res in resources for m in res.modules if m.startswith('plgrid')}
+    elapsed_pp = time.time() - pts
+
+    elapsed = time.time() - ts
+    logger.info('(%.3f) HOSTS = %d, APPS = %d, MODULES = %d, QUERY = %.3f, PROC = %.3f',
+                elapsed, len(hosts), len(applications), len(modules), elapsed_query, elapsed_pp)
+
+    return hosts, storage, applications, modules