81b4ab44ad6eb25c5adb31b38ef9cbcf5e8c7354
[qcg-portal.git] / qcg / service.py
1 import logging
2 import time
3
4 from django.db import transaction
5 from django.utils.functional import SimpleLazyObject
6 from django.utils.timezone import now
7 from pyqcg import QCG
8 from pyqcg.description import JobDescription
9 from pyqcg.service import Registry
10 from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
11
12
13 logger = logging.getLogger(__name__)
14
15
16 @transaction.atomic
17 def update_user_data(user, proxy):
18     ts = time.time()
19     QCG.start()
20     from qcg.models import User, Job, Task, Allocation, NodeInfo
21
22     credential = Credential(proxy)
23     registry = Registry(credential)
24
25     # put lock on user record (hopefully..?)
26     user = User.objects.select_for_update().get(pk=user.pk)
27
28     changed_filter = {'changed': TimePeriod(after=user.last_update)}
29
30     ###################################
31     # Jobs
32     ###################################
33     jts = time.time()
34     jobs = registry.jobs(stats=[v for k, v in vars(JobStatus).iteritems() if not k.startswith('__')],
35                          **changed_filter)
36     jte = time.time()
37
38     jobs_count = 0
39     for qcg_job in jobs:
40         params = Job.qcg_map(qcg_job, user)
41         job_id = params.pop('job_id')
42
43         Job.objects.update_or_create(job_id=job_id, defaults=params)
44         jobs_count += 1
45
46     ###################################
47     # Tasks
48     ###################################
49     tts = time.time()
50     tasks = registry.tasks(stats=[v for k, v in vars(TaskStatus).iteritems() if not k.startswith('__')],
51                            **changed_filter)
52     tte = time.time()
53
54     jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
55     task_count = 0
56     for qcg_task in tasks:
57         params = Task.qcg_map(qcg_task, jobs_cache)
58         task_id = params.pop('task_id')
59
60         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
61
62         if not created:
63             task.allocations.all().delete()
64
65         for qcg_alloc in qcg_task.allocations:
66             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
67
68             for qcg_node in qcg_alloc.nodes:
69                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
70
71         task_count += 1
72
73     # release user lock
74     user.last_update = now()
75     user.save()
76
77     elapsed = time.time() - ts
78     elapsed_jobs = jte - jts
79     elapsed_tasks = tte - tts
80     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
81     logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
82                 elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
83
84
85 def submit_job(params, proxy):
86     QCG.start()
87     desc = JobDescription(Credential(proxy))
88
89     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
90                   'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify',
91                   'preprocess', 'postprocess', 'persistent')
92
93     for name in direct_map:
94         if params[name]:
95             setattr(desc, name, params[name])
96
97     if params['application']:
98         desc.set_application(*params['application'])
99     if params['nodes']:
100         desc.set_nodes(*params['nodes'])
101     if params['reservation']:
102         desc.set_reservation(params['reservation'])
103     if params['watch_output']:
104         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
105     # TODO script
106     # TODO monitoring
107
108     # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
109     #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
110
111     # print desc.xml_description
112
113     job = desc.submit()
114
115     return job.job_id