5 from django.db import transaction
6 from django.utils.functional import SimpleLazyObject
7 from django.utils.timezone import now
9 from pyqcg.description import JobDescription
10 from pyqcg.service import Registry
11 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
13 from filex.ftp import FTPOperation
14 from qcg.constants import QCG_DATA_URL
15 from qcg.models import User, Job, Task, Allocation, NodeInfo
16 from qcg.utils import random_id, chunks
19 logger = logging.getLogger(__name__)
23 def update_user_data(user, proxy):
27 registry = Registry(Credential(proxy))
29 # put lock on user record (hopefully..?)
30 user = User.objects.select_for_update().get(pk=user.pk)
31 last_update = user.last_update
33 changed_filter = {'changed': TimePeriod(after=last_update)}
35 ###################################
37 ###################################
39 jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
45 params = Job.qcg_map(qcg_job, user)
46 job_id = params.pop('job_id')
48 Job.objects.update_or_create(job_id=job_id, defaults=params)
51 ###################################
53 ###################################
55 tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
59 jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
61 for qcg_task in tasks:
62 params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
63 task_id = params.pop('task_id')
65 task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
68 task.allocations.all().delete()
70 for qcg_alloc in qcg_task.allocations:
71 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
73 for qcg_node in qcg_alloc.nodes:
74 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
79 user.last_update = now()
82 elapsed = time.time() - ts
83 elapsed_jobs = jte - jts
84 elapsed_tasks = tte - tts
85 elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
86 logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
87 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
91 def update_job(job, proxy):
92 if job.terminated and job.purged:
98 credential = Credential(proxy)
100 qcg_job.credential = credential
103 qcg_job.refresh_information()
104 elapsed_job = time.time() - jts
106 for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
107 if getattr(job, name) != val:
108 setattr(job, name, val)
113 for task in job.tasks.all():
114 qcg_task = task.qcg_task
115 qcg_task.credential = credential
118 qcg_task.refresh_information()
119 elapsed_tasks += time.time() - tts
121 for name, val in task.qcg_map(qcg_task, task.job).iteritems():
122 if getattr(task, name) != val:
123 setattr(task, name, val)
126 task.allocations.all().delete()
128 for qcg_alloc in qcg_task.allocations:
129 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
131 for qcg_node in qcg_alloc.nodes:
132 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
134 elapsed = time.time() - ts
135 elapsed_py = elapsed - elapsed_job - elapsed_tasks
136 logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
137 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
140 def submit_job(params, proxy):
142 desc = JobDescription(Credential(proxy))
144 direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
145 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
146 'preprocess', 'postprocess', 'persistent')
148 for name in direct_map:
150 setattr(desc, name, params[name])
152 if params['application']:
153 desc.set_application(*params['application'])
154 desc.stage_in += [params['master_file']]
155 desc.arguments.insert(0, os.path.basename(params['master_file']))
157 ftp = FTPOperation(proxy)
159 ftp.mkdir(QCG_DATA_URL, parents=True)
160 url = os.path.join(QCG_DATA_URL, 'script.{}.sh'.format(random_id()))
163 for chunk in chunks(params['script'], 4096):
164 ftp.stream.put(chunk)
168 desc.executable = url
170 desc.set_nodes(*params['nodes'])
171 if params['reservation']:
172 desc.set_reservation(params['reservation'])
173 if params['watch_output']:
174 desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
182 def cancel(obj, proxy):
186 qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
187 qcg_obj.credential = Credential(proxy)
191 elapsed_cancel = time.time() - jts
193 elapsed = time.time() - ts
194 elapsed_py = elapsed - elapsed_cancel
195 logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_cancel, elapsed_py)
198 def clean(obj, proxy):
202 qcg_obj = obj.qcg_job if isinstance(obj, Job) else obj.qcg_task
203 qcg_obj.credential = Credential(proxy)
207 elapsed_clean = time.time() - jts
209 elapsed = time.time() - ts
210 elapsed_py = elapsed - elapsed_clean
211 logger.info('(%.3f) OBJ = %s (%.3f), TIME = %.3f', elapsed, obj, elapsed_clean, elapsed_py)