4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
8 from pyqcg.service import Registry, JobFactory
9 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
11 from qcg.models import User, Job, Task, Allocation, NodeInfo
13 __all__ = ['update_user_data', 'update_job', 'submit', 'cancel', 'clean', 'fetch_resources']
15 logger = logging.getLogger(__name__)
19 def update_user_data(user, proxy):
23 registry = Registry(Credential(proxy))
25 # put lock on user record (hopefully..?)
26 user = User.objects.select_for_update().get(pk=user.pk)
27 last_update = user.last_update
29 changed_filter = {'changed': TimePeriod(after=last_update)}
31 ###################################
33 ###################################
35 jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
41 params = Job.qcg_map(qcg_job, user)
42 job_id = params.pop('job_id')
44 Job.objects.update_or_create(job_id=job_id, defaults=params)
47 ###################################
49 ###################################
51 tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
55 jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
57 for qcg_task in tasks:
58 params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
59 task_id = params.pop('task_id')
61 task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
64 task.allocations.all().delete()
66 for qcg_alloc in qcg_task.allocations:
67 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
69 for qcg_node in qcg_alloc.nodes:
70 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
75 user.last_update = now()
78 elapsed = time.time() - ts
79 elapsed_jobs = jte - jts
80 elapsed_tasks = tte - tts
81 elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
82 logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
83 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
87 def update_job(job, proxy):
91 credential = Credential(proxy)
93 qcg_job.credential = credential
96 qcg_job.refresh_information()
97 elapsed_job = time.time() - jts
99 for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
100 if getattr(job, name) != val:
101 setattr(job, name, val)
106 for task in job.tasks.all():
107 qcg_task = task.qcg_task
108 qcg_task.credential = credential
111 qcg_task.refresh_information()
112 elapsed_tasks += time.time() - tts
114 for name, val in task.qcg_map(qcg_task, task.job).iteritems():
115 if getattr(task, name) != val:
116 setattr(task, name, val)
119 task.allocations.all().delete()
121 for qcg_alloc in qcg_task.allocations:
122 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
124 for qcg_node in qcg_alloc.nodes:
125 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
127 elapsed = time.time() - ts
128 elapsed_py = elapsed - elapsed_job - elapsed_tasks
129 logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
130 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
133 def submit(obj, proxy):
136 cred = Credential(proxy)
139 result = obj.submit(cred)
140 elapsed_submit = time.time() - jts
142 elapsed = time.time() - ts
143 elapsed_py = elapsed - elapsed_submit
144 logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py)
149 def cancel(obj, proxy):
153 qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
154 qcg_obj.credential = Credential(proxy)
158 elapsed_cancel = time.time() - jts
160 elapsed = time.time() - ts
161 elapsed_py = elapsed - elapsed_cancel
162 logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
165 def clean(obj, proxy):
169 qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
170 qcg_obj.credential = Credential(proxy)
174 elapsed_clean = time.time() - jts
176 elapsed = time.time() - ts
177 elapsed_py = elapsed - elapsed_clean
178 logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
181 def fetch_resources(proxy):
184 cred = Credential(proxy)
187 resources = list(JobFactory().resources(False, cred))
188 elapsed_query = time.time() - rts
191 hosts = [res.name for res in resources]
192 storage = [res.storage for res in resources]
193 applications = {m for res in resources for m in res.applications}
194 modules = {m for res in resources for m in res.modules if m.startswith('plgrid')}
195 elapsed_pp = time.time() - pts
197 elapsed = time.time() - ts
198 logger.info('(%.3f) HOSTS = %d, APPS = %d, MODULES = %d, QUERY = %.3f, PROC = %.3f',
199 elapsed, len(hosts), len(applications), len(modules), elapsed_query, elapsed_pp)
201 return hosts, storage, applications, modules