Merge branch 'devel'
[qcg-portal.git] / qcg / service.py
index c87f40a..99fdf36 100644 (file)
@@ -5,12 +5,12 @@ from django.db import transaction
 from django.utils.functional import SimpleLazyObject
 from django.utils.timezone import now
 from pyqcg import QCG
 from django.utils.functional import SimpleLazyObject
 from django.utils.timezone import now
 from pyqcg import QCG
-from pyqcg.description import JobDescription
-from pyqcg.service import Registry
+from pyqcg.service import Registry, JobFactory
 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
 
 from qcg.models import User, Job, Task, Allocation, NodeInfo
 
 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
 
 from qcg.models import User, Job, Task, Allocation, NodeInfo
 
+__all__ = ['update_user_data', 'update_job', 'submit', 'cancel', 'clean', 'fetch_resources']
 
 logger = logging.getLogger(__name__)
 
 
 logger = logging.getLogger(__name__)
 
@@ -85,9 +85,6 @@ def update_user_data(user, proxy):
 
 @transaction.atomic
 def update_job(job, proxy):
 
 @transaction.atomic
 def update_job(job, proxy):
-    if job.terminated:
-        return
-
     ts = time.time()
     QCG.start()
 
     ts = time.time()
     QCG.start()
 
@@ -133,37 +130,20 @@ def update_job(job, proxy):
                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
 
 
                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
 
 
-def submit_job(params, proxy):
+def submit(obj, proxy):
+    ts = time.time()
     QCG.start()
     QCG.start()
-    desc = JobDescription(Credential(proxy))
-
-    direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
-                  'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
-                  'preprocess', 'postprocess', 'persistent')
-
-    for name in direct_map:
-        if params[name]:
-            setattr(desc, name, params[name])
+    cred = Credential(proxy)
 
 
-    if params['application']:
-        desc.set_application(*params['application'])
-    if params['nodes']:
-        desc.set_nodes(*params['nodes'])
-    if params['reservation']:
-        desc.set_reservation(params['reservation'])
-    if params['watch_output']:
-        desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
-    # TODO script
-    # TODO monitoring
-
-    # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
-    #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
-
-    # print desc.xml_description
+    jts = time.time()
+    result = obj.submit(cred)
+    elapsed_submit = time.time() - jts
 
 
-    job = desc.submit()
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_submit
+    logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py)
 
 
-    return job.job_id
+    return result
 
 
 def cancel(obj, proxy):
 
 
 def cancel(obj, proxy):
@@ -179,4 +159,43 @@ def cancel(obj, proxy):
 
     elapsed = time.time() - ts
     elapsed_py = elapsed - elapsed_cancel
 
     elapsed = time.time() - ts
     elapsed_py = elapsed - elapsed_cancel
-    logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
+    logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
+
+
+def clean(obj, proxy):
+    ts = time.time()
+    QCG.start()
+
+    qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+    qcg_obj.credential = Credential(proxy)
+
+    jts = time.time()
+    qcg_obj.clean()
+    elapsed_clean = time.time() - jts
+
+    elapsed = time.time() - ts
+    elapsed_py = elapsed - elapsed_clean
+    logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
+
+
+def fetch_resources(proxy):
+    ts = time.time()
+    QCG.start()
+    cred = Credential(proxy)
+
+    rts = time.time()
+    resources = list(JobFactory().resources(False, cred))
+    elapsed_query = time.time() - rts
+
+    pts = time.time()
+    hosts = [res.name for res in resources]
+    storage = [res.storage for res in resources]
+    applications = {m for res in resources for m in res.applications}
+    modules = {m for res in resources for m in res.modules if m.startswith('plgrid')}
+    elapsed_pp = time.time() - pts
+
+    elapsed = time.time() - ts
+    logger.info('(%.3f) HOSTS = %d, APPS = %d, MODULES = %d, QUERY = %.3f, PROC = %.3f',
+                elapsed, len(hosts), len(applications), len(modules), elapsed_query, elapsed_pp)
+
+    return hosts, storage, applications, modules