f950aacc9a9c4057e71fe296d7525692166e6f96
[qcg-portal.git] / qcg / service.py
1 import logging
2 import time
3
4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
7 from pyqcg import QCG
8 from pyqcg.description import JobDescription
9 from pyqcg.service import Registry
10 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
11
12 from qcg.models import User, Job, Task, Allocation, NodeInfo
13
14
15 logger = logging.getLogger(__name__)
16
17
18 @transaction.atomic
19 def update_user_data(user, proxy):
20     ts = time.time()
21     QCG.start()
22
23     registry = Registry(Credential(proxy))
24
25     # put lock on user record (hopefully..?)
26     user = User.objects.select_for_update().get(pk=user.pk)
27     last_update = user.last_update
28
29     changed_filter = {'changed': TimePeriod(after=last_update)}
30
31     ###################################
32     # Jobs
33     ###################################
34     jts = time.time()
35     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
36                          **changed_filter)
37     jte = time.time()
38
39     jobs_count = 0
40     for qcg_job in jobs:
41         params = Job.qcg_map(qcg_job, user)
42         job_id = params.pop('job_id')
43
44         Job.objects.update_or_create(job_id=job_id, defaults=params)
45         jobs_count += 1
46
47     ###################################
48     # Tasks
49     ###################################
50     tts = time.time()
51     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
52                            **changed_filter)
53     tte = time.time()
54
55     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
56     task_count = 0
57     for qcg_task in tasks:
58         params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
59         task_id = params.pop('task_id')
60
61         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
62
63         if not created:
64             task.allocations.all().delete()
65
66         for qcg_alloc in qcg_task.allocations:
67             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
68
69             for qcg_node in qcg_alloc.nodes:
70                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
71
72         task_count += 1
73
74     # release user lock
75     user.last_update = now()
76     user.save()
77
78     elapsed = time.time() - ts
79     elapsed_jobs = jte - jts
80     elapsed_tasks = tte - tts
81     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
82     logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
83                 elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
84
85
86 @transaction.atomic
87 def update_job(job, proxy):
88     if job.get_status_display() in [JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELED]:
89         return
90
91     ts = time.time()
92     QCG.start()
93
94     credential = Credential(proxy)
95     qcg_job = job.qcg_job
96     qcg_job.credential = credential
97
98     jts = time.time()
99     qcg_job.refresh_information()
100     elapsed_job = time.time() - jts
101
102     for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
103         if getattr(job, name) != val:
104             setattr(job, name, val)
105
106     job.save()
107
108     elapsed_tasks = 0
109     for task in job.tasks.all():
110         qcg_task = task.qcg_task
111         qcg_task.credential = credential
112
113         tts = time.time()
114         qcg_task.refresh_information()
115         elapsed_tasks += time.time() - tts
116
117         for name, val in task.qcg_map(qcg_task, task.job).iteritems():
118             if getattr(task, name) != val:
119                 setattr(task, name, val)
120
121         task.save()
122         task.allocations.all().delete()
123
124         for qcg_alloc in qcg_task.allocations:
125             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
126
127             for qcg_node in qcg_alloc.nodes:
128                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
129
130     elapsed = time.time() - ts
131     elapsed_py = elapsed - elapsed_job - elapsed_tasks
132     logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
133                 elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
134
135
136 def submit_job(params, proxy):
137     QCG.start()
138     desc = JobDescription(Credential(proxy))
139
140     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
141                   'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
142                   'preprocess', 'postprocess', 'persistent')
143
144     for name in direct_map:
145         if params[name]:
146             setattr(desc, name, params[name])
147
148     if params['application']:
149         desc.set_application(*params['application'])
150     if params['nodes']:
151         desc.set_nodes(*params['nodes'])
152     if params['reservation']:
153         desc.set_reservation(params['reservation'])
154     if params['watch_output']:
155         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
156     # TODO script
157     # TODO monitoring
158
159     # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
160     #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
161
162     # print desc.xml_description
163
164     job = desc.submit()
165
166     return job.job_id