4 from django.db import transaction
5 from django.utils.timezone import now
7 from pyqcg.description import JobDescription
8 from pyqcg.service import Registry
9 from pyqcg.utils import Credential, TimePeriod
12 logger = logging.getLogger(__name__)
16 def update_user_data(user, proxy):
19 from qcg.models import User, Job, Task, Allocation, NodeInfo
21 credential = Credential(proxy)
22 registry = Registry(credential)
24 # put lock on user record (hopefully..?)
25 user = User.objects.select_for_update().get(pk=user.pk)
27 changed_filter = {'changed': TimePeriod(after=user.last_update)}
29 ###################################
31 ###################################
33 jobs = registry.jobs(**changed_filter)
38 params = Job.qcg_map(qcg_job, user)
39 job_id = params.pop('job_id')
41 Job.objects.update_or_create(job_id=job_id, defaults=params)
44 ###################################
46 ###################################
48 tasks = registry.tasks(**changed_filter)
51 jobs_cache = {j.job_id: j for j in Job.objects.filter(owner=user)}
53 for qcg_task in tasks:
54 params = Task.qcg_map(qcg_task, jobs_cache)
55 task_id = params.pop('task_id')
57 task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
60 task.allocations.all().delete()
62 for qcg_alloc in qcg_task.allocations:
63 alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
65 for qcg_node in qcg_alloc.nodes:
66 alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
71 user.last_update = now()
74 elapsed = time.time() - ts
75 elapsed_jobs = jte - jts
76 elapsed_tasks = tte - tts
77 elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
78 logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
79 elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
82 def submit_job(params, proxy):
86 desc = JobDescription(Credential(proxy))
88 direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs',
89 'wall_time', 'memory', 'memory_per_slot', 'modules', 'native', 'notify', 'preprocess', 'postprocess',
92 for name in direct_map:
94 setattr(desc, name, params[name])
96 if params['application']:
97 desc.set_application(*params['application'])
99 desc.set_nodes(*params['nodes'])
100 if params['reservation']:
101 desc.set_reservation(params['reservation'])
102 if params['watch_output']:
103 desc.set_watch_output(params['watch_output'], params['watch_output_pattern'])
111 # for prop in direct_map + ('application', 'nodes', 'env_variables', 'reservation', 'watch_output'):
112 # print prop, type(getattr(desc, prop)), repr(getattr(desc, prop))
114 # print desc.xml_description
116 # job = desc.submit()