job templates: populating submit form with template's attributes
[qcg-portal.git] / qcg / service.py
1 import logging
2 import time
3
4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
7 from pyqcg import QCG
8 from pyqcg.service import Registry
9 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
10
11 from qcg.models import User, Job, Task, Allocation, NodeInfo
12
13
14 logger = logging.getLogger(__name__)
15
16
17 @transaction.atomic
18 def update_user_data(user, proxy):
19     ts = time.time()
20     QCG.start()
21
22     registry = Registry(Credential(proxy))
23
24     # put lock on user record (hopefully..?)
25     user = User.objects.select_for_update().get(pk=user.pk)
26     last_update = user.last_update
27
28     changed_filter = {'changed': TimePeriod(after=last_update)}
29
30     ###################################
31     # Jobs
32     ###################################
33     jts = time.time()
34     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
35                          **changed_filter)
36     jte = time.time()
37
38     jobs_count = 0
39     for qcg_job in jobs:
40         params = Job.qcg_map(qcg_job, user)
41         job_id = params.pop('job_id')
42
43         Job.objects.update_or_create(job_id=job_id, defaults=params)
44         jobs_count += 1
45
46     ###################################
47     # Tasks
48     ###################################
49     tts = time.time()
50     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
51                            **changed_filter)
52     tte = time.time()
53
54     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
55     task_count = 0
56     for qcg_task in tasks:
57         params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
58         task_id = params.pop('task_id')
59
60         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
61
62         if not created:
63             task.allocations.all().delete()
64
65         for qcg_alloc in qcg_task.allocations:
66             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
67
68             for qcg_node in qcg_alloc.nodes:
69                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
70
71         task_count += 1
72
73     # release user lock
74     user.last_update = now()
75     user.save()
76
77     elapsed = time.time() - ts
78     elapsed_jobs = jte - jts
79     elapsed_tasks = tte - tts
80     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
81     logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
82                 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
83
84
85 @transaction.atomic
86 def update_job(job, proxy):
87     if job.terminated and job.purged:
88         return
89
90     ts = time.time()
91     QCG.start()
92
93     credential = Credential(proxy)
94     qcg_job = job.qcg_job
95     qcg_job.credential = credential
96
97     jts = time.time()
98     qcg_job.refresh_information()
99     elapsed_job = time.time() - jts
100
101     for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
102         if getattr(job, name) != val:
103             setattr(job, name, val)
104
105     job.save()
106
107     elapsed_tasks = 0
108     for task in job.tasks.all():
109         qcg_task = task.qcg_task
110         qcg_task.credential = credential
111
112         tts = time.time()
113         qcg_task.refresh_information()
114         elapsed_tasks += time.time() - tts
115
116         for name, val in task.qcg_map(qcg_task, task.job).iteritems():
117             if getattr(task, name) != val:
118                 setattr(task, name, val)
119
120         task.save()
121         task.allocations.all().delete()
122
123         for qcg_alloc in qcg_task.allocations:
124             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
125
126             for qcg_node in qcg_alloc.nodes:
127                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
128
129     elapsed = time.time() - ts
130     elapsed_py = elapsed - elapsed_job - elapsed_tasks
131     logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
132                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
133
134
135 def cancel(obj, proxy):
136     ts = time.time()
137     QCG.start()
138
139     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
140     qcg_obj.credential = Credential(proxy)
141
142     jts = time.time()
143     qcg_obj.cancel()
144     elapsed_cancel = time.time() - jts
145
146     elapsed = time.time() - ts
147     elapsed_py = elapsed - elapsed_cancel
148     logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
149
150
151 def clean(obj, proxy):
152     ts = time.time()
153     QCG.start()
154
155     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
156     qcg_obj.credential = Credential(proxy)
157
158     jts = time.time()
159     qcg_obj.clean()
160     elapsed_clean = time.time() - jts
161
162     elapsed = time.time() - ts
163     elapsed_py = elapsed - elapsed_clean
164     logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)