from django.utils.functional import SimpleLazyObject
from django.utils.timezone import now
from pyqcg import QCG
-from pyqcg.description import JobDescription
from pyqcg.service import Registry
from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
+from qcg.models import User, Job, Task, Allocation, NodeInfo
+
logger = logging.getLogger(__name__)
def update_user_data(user, proxy):
ts = time.time()
QCG.start()
- from qcg.models import User, Job, Task, Allocation, NodeInfo
- credential = Credential(proxy)
- registry = Registry(credential)
+ registry = Registry(Credential(proxy))
# put lock on user record (hopefully..?)
user = User.objects.select_for_update().get(pk=user.pk)
+ last_update = user.last_update
- changed_filter = {'changed': TimePeriod(after=user.last_update)}
+ changed_filter = {'changed': TimePeriod(after=last_update)}
###################################
# Jobs
jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
task_count = 0
for qcg_task in tasks:
- params = Task.qcg_map(qcg_task, jobs_cache)
+ params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
task_id = params.pop('task_id')
task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
elapsed_jobs = jte - jts
elapsed_tasks = tte - tts
elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
- logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
- elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
+ logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
+ elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
-def submit_job(params, proxy):
+@transaction.atomic
+def update_job(job, proxy):
+ ts = time.time()
QCG.start()
- desc = JobDescription(Credential(proxy))
- direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
- 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
- 'preprocess', 'postprocess', 'persistent')
+ credential = Credential(proxy)
+ qcg_job = job.qcg_job
+ qcg_job.credential = credential
+
+ jts = time.time()
+ qcg_job.refresh_information()
+ elapsed_job = time.time() - jts
+
+ for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
+ if getattr(job, name) != val:
+ setattr(job, name, val)
+
+ job.save()
- for name in direct_map:
- if params[name]:
- setattr(desc, name, params[name])
+ elapsed_tasks = 0
+ for task in job.tasks.all():
+ qcg_task = task.qcg_task
+ qcg_task.credential = credential
- if params['application']:
- desc.set_application(*params['application'])
- if params['nodes']:
- desc.set_nodes(*params['nodes'])
- if params['reservation']:
- desc.set_reservation(params['reservation'])
- if params['watch_output']:
- desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
- # TODO script
- # TODO monitoring
+ tts = time.time()
+ qcg_task.refresh_information()
+ elapsed_tasks += time.time() - tts
- # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
- # print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
+ for name, val in task.qcg_map(qcg_task, task.job).iteritems():
+ if getattr(task, name) != val:
+ setattr(task, name, val)
- # print desc.xml_description
+ task.save()
+ task.allocations.all().delete()
+
+ for qcg_alloc in qcg_task.allocations:
+ alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
+
+ for qcg_node in qcg_alloc.nodes:
+ alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
+
+ elapsed = time.time() - ts
+ elapsed_py = elapsed - elapsed_job - elapsed_tasks
+ logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
+ elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
- job = desc.submit()
- return job.job_id
+def cancel(obj, proxy):
+ ts = time.time()
+ QCG.start()
+
+ qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+ qcg_obj.credential = Credential(proxy)
+
+ jts = time.time()
+ qcg_obj.cancel()
+ elapsed_cancel = time.time() - jts
+
+ elapsed = time.time() - ts
+ elapsed_py = elapsed - elapsed_cancel
+ logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
+
+
+def clean(obj, proxy):
+ ts = time.time()
+ QCG.start()
+
+ qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
+ qcg_obj.credential = Credential(proxy)
+
+ jts = time.time()
+ qcg_obj.clean()
+ elapsed_clean = time.time() - jts
+
+ elapsed = time.time() - ts
+ elapsed_py = elapsed - elapsed_clean
+ logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)