4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
8 from pyqcg.description import JobDescription
9 from pyqcg.service import Registry
10 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
12 from qcg.models import User, Job, Task, Allocation, NodeInfo
15 logger = logging.getLogger(__name__)
19 def update_user_data(user, proxy):
23 registry = Registry(Credential(proxy))
25 # put lock on user record (hopefully..?)
26 user = User.objects.select_for_update().get(pk=user.pk)
27 last_update = user.last_update
29 changed_filter = {'changed': TimePeriod(after=last_update)}
31 ###################################
33 ###################################
35 jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
41 params = Job.qcg_map(qcg_job, user)
42 job_id = params.pop('job_id')
44 Job.objects.update_or_create(job_id=job_id, defaults=params)
47 ###################################
49 ###################################
51 tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
55 jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
57 for qcg_task in tasks:
58 params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
59 task_id = params.pop('task_id')
61 task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
64 task.allocations.all().delete()
66 for qcg_alloc in qcg_task.allocations:
67 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
69 for qcg_node in qcg_alloc.nodes:
70 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
75 user.last_update = now()
78 elapsed = time.time() - ts
79 elapsed_jobs = jte - jts
80 elapsed_tasks = tte - tts
81 elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
82 logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
83 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
87 def update_job(job, proxy):
88 if job.get_status_display() in [JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELED]:
94 credential = Credential(proxy)
96 qcg_job.credential = credential
99 qcg_job.refresh_information()
100 elapsed_job = time.time() - jts
102 for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
103 if getattr(job, name) != val:
104 setattr(job, name, val)
109 for task in job.tasks.all():
110 qcg_task = task.qcg_task
111 qcg_task.credential = credential
114 qcg_task.refresh_information()
115 elapsed_tasks += time.time() - tts
117 for name, val in task.qcg_map(qcg_task, task.job).iteritems():
118 if getattr(task, name) != val:
119 setattr(task, name, val)
122 task.allocations.all().delete()
124 for qcg_alloc in qcg_task.allocations:
125 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
127 for qcg_node in qcg_alloc.nodes:
128 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
130 elapsed = time.time() - ts
131 elapsed_py = elapsed - elapsed_job - elapsed_tasks
132 logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
133 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
136 def submit_job(params, proxy):
138 desc = JobDescription(Credential(proxy))
140 direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
141 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
142 'preprocess', 'postprocess', 'persistent')
144 for name in direct_map:
146 setattr(desc, name, params[name])
148 if params['application']:
149 desc.set_application(*params['application'])
151 desc.set_nodes(*params['nodes'])
152 if params['reservation']:
153 desc.set_reservation(params['reservation'])
154 if params['watch_output']:
155 desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
159 # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
160 # print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
162 # print desc.xml_description