fetch resources info and set choices in job submission form
[qcg-portal.git] / qcg / service.py
1 import logging
2 import time
3
4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
7 from pyqcg import QCG
8 from pyqcg.service import Registry, JobFactory
9 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
10
11 from qcg.models import User, Job, Task, Allocation, NodeInfo
12
13
14 logger = logging.getLogger(__name__)
15
16
17 @transaction.atomic
18 def update_user_data(user, proxy):
19     ts = time.time()
20     QCG.start()
21
22     registry = Registry(Credential(proxy))
23
24     # put lock on user record (hopefully..?)
25     user = User.objects.select_for_update().get(pk=user.pk)
26     last_update = user.last_update
27
28     changed_filter = {'changed': TimePeriod(after=last_update)}
29
30     ###################################
31     # Jobs
32     ###################################
33     jts = time.time()
34     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
35                          **changed_filter)
36     jte = time.time()
37
38     jobs_count = 0
39     for qcg_job in jobs:
40         params = Job.qcg_map(qcg_job, user)
41         job_id = params.pop('job_id')
42
43         Job.objects.update_or_create(job_id=job_id, defaults=params)
44         jobs_count += 1
45
46     ###################################
47     # Tasks
48     ###################################
49     tts = time.time()
50     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
51                            **changed_filter)
52     tte = time.time()
53
54     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
55     task_count = 0
56     for qcg_task in tasks:
57         params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
58         task_id = params.pop('task_id')
59
60         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
61
62         if not created:
63             task.allocations.all().delete()
64
65         for qcg_alloc in qcg_task.allocations:
66             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
67
68             for qcg_node in qcg_alloc.nodes:
69                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
70
71         task_count += 1
72
73     # release user lock
74     user.last_update = now()
75     user.save()
76
77     elapsed = time.time() - ts
78     elapsed_jobs = jte - jts
79     elapsed_tasks = tte - tts
80     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
81     logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
82                 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
83
84
85 @transaction.atomic
86 def update_job(job, proxy):
87     ts = time.time()
88     QCG.start()
89
90     credential = Credential(proxy)
91     qcg_job = job.qcg_job
92     qcg_job.credential = credential
93
94     jts = time.time()
95     qcg_job.refresh_information()
96     elapsed_job = time.time() - jts
97
98     for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
99         if getattr(job, name) != val:
100             setattr(job, name, val)
101
102     job.save()
103
104     elapsed_tasks = 0
105     for task in job.tasks.all():
106         qcg_task = task.qcg_task
107         qcg_task.credential = credential
108
109         tts = time.time()
110         qcg_task.refresh_information()
111         elapsed_tasks += time.time() - tts
112
113         for name, val in task.qcg_map(qcg_task, task.job).iteritems():
114             if getattr(task, name) != val:
115                 setattr(task, name, val)
116
117         task.save()
118         task.allocations.all().delete()
119
120         for qcg_alloc in qcg_task.allocations:
121             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
122
123             for qcg_node in qcg_alloc.nodes:
124                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
125
126     elapsed = time.time() - ts
127     elapsed_py = elapsed - elapsed_job - elapsed_tasks
128     logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
129                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
130
131
132 def submit(obj, proxy):
133     ts = time.time()
134     QCG.start()
135     cred = Credential(proxy)
136
137     jts = time.time()
138     result = obj.submit(cred)
139     elapsed_submit = time.time() - jts
140
141     elapsed = time.time() - ts
142     elapsed_py = elapsed - elapsed_submit
143     logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py)
144
145     return result
146
147
148 def cancel(obj, proxy):
149     ts = time.time()
150     QCG.start()
151
152     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
153     qcg_obj.credential = Credential(proxy)
154
155     jts = time.time()
156     qcg_obj.cancel()
157     elapsed_cancel = time.time() - jts
158
159     elapsed = time.time() - ts
160     elapsed_py = elapsed - elapsed_cancel
161     logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
162
163
164 def clean(obj, proxy):
165     ts = time.time()
166     QCG.start()
167
168     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
169     qcg_obj.credential = Credential(proxy)
170
171     jts = time.time()
172     qcg_obj.clean()
173     elapsed_clean = time.time() - jts
174
175     elapsed = time.time() - ts
176     elapsed_py = elapsed - elapsed_clean
177     logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
178
179
180 def fetch_resources(proxy):
181     ts = time.time()
182     QCG.start()
183     cred = Credential(proxy)
184
185     rts = time.time()
186     resources = list(JobFactory().resources(False, cred))
187     elapsed_query = time.time() - rts
188
189     pts = time.time()
190     hosts = [res.name for res in resources]
191     storage = [res.storage for res in resources]
192     applications = {m for res in resources for m in res.applications}
193     modules = {m for res in resources for m in res.modules if m.startswith('plgrid')}
194     elapsed_pp = time.time() - pts
195
196     elapsed = time.time() - ts
197     logger.info('(%.3f) HOSTS = %d, APPS = %d, MODULES = %d, QUERY = %.3f, PROC = %.3f',
198                 elapsed, len(hosts), len(applications), len(modules), elapsed_query, elapsed_pp)
199
200     return hosts, storage, applications, modules