44cb42ee4407307ac3cbc78e30d4c79af018c28d
[qcg-portal.git] / qcg / service.py
1 import logging
2 import os
3 import time
4
5 from django.db import transaction
6 from django.utils.functional import SimpleLazyObject
7 from django.utils.timezone import now
8 from pyqcg import QCG
9 from pyqcg.description import JobDescription
10 from pyqcg.service import Registry
11 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
12
13 from qcg.models import User, Job, Task, Allocation, NodeInfo
14
15
16 logger = logging.getLogger(__name__)
17
18
19 @transaction.atomic
20 def update_user_data(user, proxy):
21     ts = time.time()
22     QCG.start()
23
24     registry = Registry(Credential(proxy))
25
26     # put lock on user record (hopefully..?)
27     user = User.objects.select_for_update().get(pk=user.pk)
28     last_update = user.last_update
29
30     changed_filter = {'changed': TimePeriod(after=last_update)}
31
32     ###################################
33     # Jobs
34     ###################################
35     jts = time.time()
36     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
37                          **changed_filter)
38     jte = time.time()
39
40     jobs_count = 0
41     for qcg_job in jobs:
42         params = Job.qcg_map(qcg_job, user)
43         job_id = params.pop('job_id')
44
45         Job.objects.update_or_create(job_id=job_id, defaults=params)
46         jobs_count += 1
47
48     ###################################
49     # Tasks
50     ###################################
51     tts = time.time()
52     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
53                            **changed_filter)
54     tte = time.time()
55
56     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
57     task_count = 0
58     for qcg_task in tasks:
59         params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
60         task_id = params.pop('task_id')
61
62         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
63
64         if not created:
65             task.allocations.all().delete()
66
67         for qcg_alloc in qcg_task.allocations:
68             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
69
70             for qcg_node in qcg_alloc.nodes:
71                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
72
73         task_count += 1
74
75     # release user lock
76     user.last_update = now()
77     user.save()
78
79     elapsed = time.time() - ts
80     elapsed_jobs = jte - jts
81     elapsed_tasks = tte - tts
82     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
83     logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
84                 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
85
86
87 @transaction.atomic
88 def update_job(job, proxy):
89     if job.terminated and job.purged:
90         return
91
92     ts = time.time()
93     QCG.start()
94
95     credential = Credential(proxy)
96     qcg_job = job.qcg_job
97     qcg_job.credential = credential
98
99     jts = time.time()
100     qcg_job.refresh_information()
101     elapsed_job = time.time() - jts
102
103     for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
104         if getattr(job, name) != val:
105             setattr(job, name, val)
106
107     job.save()
108
109     elapsed_tasks = 0
110     for task in job.tasks.all():
111         qcg_task = task.qcg_task
112         qcg_task.credential = credential
113
114         tts = time.time()
115         qcg_task.refresh_information()
116         elapsed_tasks += time.time() - tts
117
118         for name, val in task.qcg_map(qcg_task, task.job).iteritems():
119             if getattr(task, name) != val:
120                 setattr(task, name, val)
121
122         task.save()
123         task.allocations.all().delete()
124
125         for qcg_alloc in qcg_task.allocations:
126             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
127
128             for qcg_node in qcg_alloc.nodes:
129                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
130
131     elapsed = time.time() - ts
132     elapsed_py = elapsed - elapsed_job - elapsed_tasks
133     logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
134                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
135
136
137 def submit_job(params, proxy):
138     QCG.start()
139     desc = JobDescription(Credential(proxy))
140
141     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
142                   'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
143                   'preprocess', 'postprocess', 'persistent')
144
145     for name in direct_map:
146         if params[name]:
147             setattr(desc, name, params[name])
148
149     if params['application']:
150         desc.set_application(*params['application'])
151         desc.stage_in += [params['master_file']]
152         desc.arguments.insert(0, os.path.basename(params['master_file']))
153     if params['nodes']:
154         desc.set_nodes(*params['nodes'])
155     if params['reservation']:
156         desc.set_reservation(params['reservation'])
157     if params['watch_output']:
158         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
159     # TODO script
160     # TODO monitoring
161
162     # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
163     #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
164
165     # print desc.xml_description
166
167     job = desc.submit()
168
169     return job.job_id
170
171
172 def cancel(obj, proxy):
173     ts = time.time()
174     QCG.start()
175
176     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
177     qcg_obj.credential = Credential(proxy)
178
179     jts = time.time()
180     qcg_obj.cancel()
181     elapsed_cancel = time.time() - jts
182
183     elapsed = time.time() - ts
184     elapsed_py = elapsed - elapsed_cancel
185     logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
186
187
188 def clean(obj, proxy):
189     ts = time.time()
190     QCG.start()
191
192     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
193     qcg_obj.credential = Credential(proxy)
194
195     jts = time.time()
196     qcg_obj.clean()
197     elapsed_clean = time.time() - jts
198
199     elapsed = time.time() - ts
200     elapsed_py = elapsed - elapsed_clean
201     logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)