submitting jobs: handling user defined script
[qcg-portal.git] / qcg / service.py
1 import logging
2 import os
3 import time
4
5 from django.db import transaction
6 from django.utils.functional import SimpleLazyObject
7 from django.utils.timezone import now
8 from pyqcg import QCG
9 from pyqcg.description import JobDescription
10 from pyqcg.service import Registry
11 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
12
13 from filex.ftp import FTPOperation
14 from qcg.constants import QCG_DATA_URL
15 from qcg.models import User, Job, Task, Allocation, NodeInfo
16 from qcg.utils import random_id, chunks
17
18
19 logger = logging.getLogger(__name__)
20
21
22 @transaction.atomic
23 def update_user_data(user, proxy):
24     ts = time.time()
25     QCG.start()
26
27     registry = Registry(Credential(proxy))
28
29     # put lock on user record (hopefully..?)
30     user = User.objects.select_for_update().get(pk=user.pk)
31     last_update = user.last_update
32
33     changed_filter = {'changed': TimePeriod(after=last_update)}
34
35     ###################################
36     # Jobs
37     ###################################
38     jts = time.time()
39     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
40                          **changed_filter)
41     jte = time.time()
42
43     jobs_count = 0
44     for qcg_job in jobs:
45         params = Job.qcg_map(qcg_job, user)
46         job_id = params.pop('job_id')
47
48         Job.objects.update_or_create(job_id=job_id, defaults=params)
49         jobs_count += 1
50
51     ###################################
52     # Tasks
53     ###################################
54     tts = time.time()
55     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
56                            **changed_filter)
57     tte = time.time()
58
59     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
60     task_count = 0
61     for qcg_task in tasks:
62         params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
63         task_id = params.pop('task_id')
64
65         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
66
67         if not created:
68             task.allocations.all().delete()
69
70         for qcg_alloc in qcg_task.allocations:
71             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
72
73             for qcg_node in qcg_alloc.nodes:
74                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
75
76         task_count += 1
77
78     # release user lock
79     user.last_update = now()
80     user.save()
81
82     elapsed = time.time() - ts
83     elapsed_jobs = jte - jts
84     elapsed_tasks = tte - tts
85     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
86     logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
87                 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
88
89
90 @transaction.atomic
91 def update_job(job, proxy):
92     if job.terminated and job.purged:
93         return
94
95     ts = time.time()
96     QCG.start()
97
98     credential = Credential(proxy)
99     qcg_job = job.qcg_job
100     qcg_job.credential = credential
101
102     jts = time.time()
103     qcg_job.refresh_information()
104     elapsed_job = time.time() - jts
105
106     for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
107         if getattr(job, name) != val:
108             setattr(job, name, val)
109
110     job.save()
111
112     elapsed_tasks = 0
113     for task in job.tasks.all():
114         qcg_task = task.qcg_task
115         qcg_task.credential = credential
116
117         tts = time.time()
118         qcg_task.refresh_information()
119         elapsed_tasks += time.time() - tts
120
121         for name, val in task.qcg_map(qcg_task, task.job).iteritems():
122             if getattr(task, name) != val:
123                 setattr(task, name, val)
124
125         task.save()
126         task.allocations.all().delete()
127
128         for qcg_alloc in qcg_task.allocations:
129             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
130
131             for qcg_node in qcg_alloc.nodes:
132                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
133
134     elapsed = time.time() - ts
135     elapsed_py = elapsed - elapsed_job - elapsed_tasks
136     logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
137                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
138
139
140 def submit_job(params, proxy):
141     QCG.start()
142     desc = JobDescription(Credential(proxy))
143
144     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
145                   'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
146                   'preprocess', 'postprocess', 'persistent')
147
148     for name in direct_map:
149         if params[name]:
150             setattr(desc, name, params[name])
151
152     if params['application']:
153         desc.set_application(*params['application'])
154         desc.stage_in += [params['master_file']]
155         desc.arguments.insert(0, os.path.basename(params['master_file']))
156     if params['script']:
157         ftp = FTPOperation(proxy)
158
159         ftp.mkdir(QCG_DATA_URL, parents=True)
160         url = os.path.join(QCG_DATA_URL, 'script.{}.sh'.format(random_id()))
161         ftp.put(url)
162
163         for chunk in chunks(params['script'], 4096):
164             ftp.stream.put(chunk)
165         ftp.stream.put(None)
166
167         ftp.wait()
168         desc.executable = url
169     if params['nodes']:
170         desc.set_nodes(*params['nodes'])
171     if params['reservation']:
172         desc.set_reservation(params['reservation'])
173     if params['watch_output']:
174         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
175     # TODO monitoring
176
177     job = desc.submit()
178
179     return job.job_id
180
181
182 def cancel(obj, proxy):
183     ts = time.time()
184     QCG.start()
185
186     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
187     qcg_obj.credential = Credential(proxy)
188
189     jts = time.time()
190     qcg_obj.cancel()
191     elapsed_cancel = time.time() - jts
192
193     elapsed = time.time() - ts
194     elapsed_py = elapsed - elapsed_cancel
195     logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
196
197
198 def clean(obj, proxy):
199     ts = time.time()
200     QCG.start()
201
202     qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
203     qcg_obj.credential = Credential(proxy)
204
205     jts = time.time()
206     qcg_obj.clean()
207     elapsed_clean = time.time() - jts
208
209     elapsed = time.time() - ts
210     elapsed_py = elapsed - elapsed_clean
211     logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)