enhancements in selecting file from gridftp & moved submitting jobs to separate file
[qcg-portal.git] / qcg / service.py
1 import logging
2 import time
3
4 from django.db import transaction
5 from django.utils.timezone import now
6 from pyqcg import QCG
7 from pyqcg.description import JobDescription
8 from pyqcg.service import Registry
9 from pyqcg.utils import Credential, TimePeriod
10
11
12 logger = logging.getLogger(__name__)
13
14
15 @transaction.atomic
16 def update_user_data(user, proxy):
17     ts = time.time()
18     QCG.start()
19     from qcg.models import User, Job, Task, Allocation, NodeInfo
20
21     credential = Credential(proxy)
22     registry = Registry(credential)
23
24     # put lock on user record (hopefully..?)
25     user = User.objects.select_for_update().get(pk=user.pk)
26
27     changed_filter = {'changed': TimePeriod(after=user.last_update)}
28
29     ###################################
30     # Jobs
31     ###################################
32     jts = time.time()
33     jobs = registry.jobs(**changed_filter)
34     jte = time.time()
35
36     jobs_count = 0
37     for qcg_job in jobs:
38         params = Job.qcg_map(qcg_job, user)
39         job_id = params.pop('job_id')
40
41         Job.objects.update_or_create(job_id=job_id, defaults=params)
42         jobs_count += 1
43
44     ###################################
45     # Tasks
46     ###################################
47     tts = time.time()
48     tasks = registry.tasks(**changed_filter)
49     tte = time.time()
50
51     jobs_cache = {j.job_id: j for j in Job.objects.filter(owner=user)}
52     task_count = 0
53     for qcg_task in tasks:
54         params = Task.qcg_map(qcg_task, jobs_cache)
55         task_id = params.pop('task_id')
56
57         task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
58
59         if not created:
60             task.allocations.all().delete()
61
62         for qcg_alloc in qcg_task.allocations:
63             alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
64
65             for qcg_node in qcg_alloc.nodes:
66                 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
67
68         task_count += 1
69
70     # release user lock
71     user.last_update = now()
72     user.save()
73
74     elapsed = time.time() - ts
75     elapsed_jobs = jte - jts
76     elapsed_tasks = tte - tts
77     elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
78     logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
79                 elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
80
81
82 def submit_job(params, proxy):
83     # print params
84
85     QCG.start()
86     desc = JobDescription(Credential(proxy))
87
88     direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
89                   'wall_time', 'memory', 'memory_per_slot', 'modules', 'native', 'notify', 'preprocess', 'postprocess',
90                   'persistent')
91
92     for name in direct_map:
93         if params[name]:
94             setattr(desc, name, params[name])
95
96     if params['application']:
97         desc.set_application(*params['application'])
98     if params['nodes']:
99         desc.set_nodes(*params['nodes'])
100     if params['reservation']:
101         desc.set_reservation(params['reservation'])
102     if params['watch_output']:
103         desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
104     # TODO script
105     # TODO executable
106     # TODO input
107     # TODO stage_in
108     # TODO stage_out
109     # TODO monitoring
110
111     # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
112     #     print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
113
114     # print desc.xml_description
115
116     # job = desc.submit()
117
118     # return job.job_id