reorganize imports
[qcg-portal.git] / qcg / service.py
1 import logging
2 import time
3
4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
7 from pyqcg import QCG
8 from pyqcg.service import Registry, JobFactory
9 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
10
11 from qcg.models import User, Job, Task, Allocation, NodeInfo
12
13 __all__ = ['update_user_data', 'update_job', 'submit', 'cancel', 'clean', 'fetch_resources']
14
15 logger = logging.getLogger(__name__)
16
17
18 @transaction.atomic
19 def update_user_data(user, proxy):
20     ts = time.time()
21     QCG.start()
22
23     registry = Registry(Credential(proxy))
24
25     # put lock on user record (hopefully..?)
26     user = User.objects.select_for_update().get(pk=user.pk)
27     last_update = user.last_update
28
29     changed_filter = {'changed': TimePeriod(after=last_update)}
30
31     ###################################
32     # Jobs
33     ###################################
34     jts = time.time()
35     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
36                          **changed_filter)
37     jte = time.time()
38
39     jobs_count = 0
40     for qcg_job in jobs:
41         params = Job.qcg_map(qcg_job, user)
42         job_id = params.pop('job_id')
43
44         Job.objects.update_or_create(job_id=job_id, defaults=params)
45         jobs_count += 1
46
47     ###################################
48     # Tasks
49     ###################################
50     tts = time.time()
51     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
52                            **changed_filter)
53     tte = time.time()
54
55     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
56     task_count = 0
57     for qcg_task in tasks:
58         params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
59         task_id = params.pop('task_id')
60
61         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
62
63         if not created:
64             task.allocations.all().delete()
65
66         for qcg_alloc in qcg_task.allocations:
67             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
68
69             for qcg_node in qcg_alloc.nodes:
70                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
71
72         task_count += 1
73
74     # release user lock
75     user.last_update = now()
76     user.save()
77
78     elapsed = time.time() - ts
79     elapsed_jobs = jte - jts
80     elapsed_tasks = tte - tts
81     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
82     logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
83                 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
84
85
86 @transaction.atomic
87 def update_job(job, proxy):
88     ts = time.time()
89     QCG.start()
90
91     credential = Credential(proxy)
92     qcg_job = job.qcg_job
93     qcg_job.credential = credential
94
95     jts = time.time()
96     qcg_job.refresh_information()
97     elapsed_job = time.time() - jts
98
99     for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
100         if getattr(job, name) != val:
101             setattr(job, name, val)
102
103     job.save()
104
105     elapsed_tasks = 0
106     for task in job.tasks.all():
107         qcg_task = task.qcg_task
108         qcg_task.credential = credential
109
110         tts = time.time()
111         qcg_task.refresh_information()
112         elapsed_tasks += time.time() - tts
113
114         for name, val in task.qcg_map(qcg_task, task.job).iteritems():
115             if getattr(task, name) != val:
116                 setattr(task, name, val)
117
118         task.save()
119         task.allocations.all().delete()
120
121         for qcg_alloc in qcg_task.allocations:
122             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
123
124             for qcg_node in qcg_alloc.nodes:
125                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
126
127     elapsed = time.time() - ts
128     elapsed_py = elapsed - elapsed_job - elapsed_tasks
129     logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
130                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
131
132
133 def submit(obj, proxy):
134     ts = time.time()
135     QCG.start()
136     cred = Credential(proxy)
137
138     jts = time.time()
139     result = obj.submit(cred)
140     elapsed_submit = time.time() - jts
141
142     elapsed = time.time() - ts
143     elapsed_py = elapsed - elapsed_submit
144     logger.info('(%.3f) JOB = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, result.job_id, elapsed_submit, elapsed_py)
145
146     return result
147
148
149 def cancel(obj, proxy):
150     ts = time.time()
151     QCG.start()
152
153     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
154     qcg_obj.credential = Credential(proxy)
155
156     jts = time.time()
157     qcg_obj.cancel()
158     elapsed_cancel = time.time() - jts
159
160     elapsed = time.time() - ts
161     elapsed_py = elapsed - elapsed_cancel
162     logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
163
164
165 def clean(obj, proxy):
166     ts = time.time()
167     QCG.start()
168
169     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
170     qcg_obj.credential = Credential(proxy)
171
172     jts = time.time()
173     qcg_obj.clean()
174     elapsed_clean = time.time() - jts
175
176     elapsed = time.time() - ts
177     elapsed_py = elapsed - elapsed_clean
178     logger.info('(%.3f) OBJ = %s, REMOTE = %.3f, LOCAL = %.3f', elapsed, obj, elapsed_clean, elapsed_py)
179
180
181 def fetch_resources(proxy):
182     ts = time.time()
183     QCG.start()
184     cred = Credential(proxy)
185
186     rts = time.time()
187     resources = list(JobFactory().resources(False, cred))
188     elapsed_query = time.time() - rts
189
190     pts = time.time()
191     hosts = [res.name for res in resources]
192     storage = [res.storage for res in resources]
193     applications = {m for res in resources for m in res.applications}
194     modules = {m for res in resources for m in res.modules if m.startswith('plgrid')}
195     elapsed_pp = time.time() - pts
196
197     elapsed = time.time() - ts
198     logger.info('(%.3f) HOSTS = %d, APPS = %d, MODULES = %d, QUERY = %.3f, PROC = %.3f',
199                 elapsed, len(hosts), len(applications), len(modules), elapsed_query, elapsed_pp)
200
201     return hosts, storage, applications, modules