4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
8 from pyqcg.service import Registry, JobFactory
9 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
11 from qcg.models import User, Job, Task, Allocation, NodeInfo
14 logger = logging.getLogger(__name__)
18 def update_user_data(user, proxy):
22 registry = Registry(Credential(proxy))
24 # put lock on user record (hopefully..?)
25 user = User.objects.select_for_update().get(pk=user.pk)
26 last_update = user.last_update
28 changed_filter = {'changed': TimePeriod(after=last_update)}
30 ###################################
32 ###################################
34 jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
40 params = Job.qcg_map(qcg_job, user)
41 job_id = params.pop('job_id')
43 Job.objects.update_or_create(job_id=job_id, defaults=params)
46 ###################################
48 ###################################
50 tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
54 jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
56 for qcg_task in tasks:
57 params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
58 task_id = params.pop('task_id')
60 task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
63 task.allocations.all().delete()
65 for qcg_alloc in qcg_task.allocations:
66 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
68 for qcg_node in qcg_alloc.nodes:
69 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
74 user.last_update = now()
77 elapsed = time.time() - ts
78 elapsed_jobs = jte - jts
79 elapsed_tasks = tte - tts
80 elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
81 logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
82 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
86 def update_job(job, proxy):
90 credential = Credential(proxy)
92 qcg_job.credential = credential
95 qcg_job.refresh_information()
96 elapsed_job = time.time() - jts
98 for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
99 if getattr(job, name) != val:
100 setattr(job, name, val)
105 for task in job.tasks.all():
106 qcg_task = task.qcg_task
107 qcg_task.credential = credential
110 qcg_task.refresh_information()
111 elapsed_tasks += time.time() - tts
113 for name, val in task.qcg_map(qcg_task, task.job).iteritems():
114 if getattr(task, name) != val:
115 setattr(task, name, val)
118 task.allocations.all().delete()
120 for qcg_alloc in qcg_task.allocations:
121 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
123 for qcg_node in qcg_alloc.nodes:
124 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
126 elapsed = time.time() - ts
127 elapsed_py = elapsed - elapsed_job - elapsed_tasks
128 logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
129 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
132 def submit(obj, proxy):
135 cred = Credential(proxy)
138 result = obj.submit(cred)
139 elapsed_submit = time.time() - jts
141 elapsed = time.time() - ts
142 elapsed_py = elapsed - elapsed_submit
143 logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py)
148 def cancel(obj, proxy):
152 qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
153 qcg_obj.credential = Credential(proxy)
157 elapsed_cancel = time.time() - jts
159 elapsed = time.time() - ts
160 elapsed_py = elapsed - elapsed_cancel
161 logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
164 def clean(obj, proxy):
168 qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
169 qcg_obj.credential = Credential(proxy)
173 elapsed_clean = time.time() - jts
175 elapsed = time.time() - ts
176 elapsed_py = elapsed - elapsed_clean
177 logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
180 def fetch_resources(proxy):
183 cred = Credential(proxy)
186 resources = list(JobFactory().resources(False, cred))
187 elapsed_query = time.time() - rts
190 hosts = [res.name for res in resources]
191 storage = [res.storage for res in resources]
192 applications = {m for res in resources for m in res.applications}
193 modules = {m for res in resources for m in res.modules if m.startswith('plgrid')}
194 elapsed_pp = time.time() - pts
196 elapsed = time.time() - ts
197 logger.info('(%.3f) HOSTS = %d, APPS = %d, MODULES = %d, QUERY = %.3f, PROC = %.3f',
198 elapsed, len(hosts), len(applications), len(modules), elapsed_query, elapsed_pp)
200 return hosts, storage, applications, modules