From e70a3ed3dfc6556ddad1bda7accb446ee5136604 Mon Sep 17 00:00:00 2001 From: Maciej Tronowski Date: Mon, 27 Apr 2015 13:52:55 +0200 Subject: [PATCH] update job attributes in job/task details view --- qcg/models.py | 30 ++++++++------------------ qcg/service.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++------ qcg/views.py | 6 +++++- 3 files changed, 72 insertions(+), 29 deletions(-) diff --git a/qcg/models.py b/qcg/models.py index 0bc0cb2..f452874 100644 --- a/qcg/models.py +++ b/qcg/models.py @@ -71,20 +71,13 @@ class Job(models.Model): username = username_from_dn(qcg_job.user_dn) if user is not None: if user.username != username: - raise ValueError('Username does not match!') + raise ValueError('Username does not match ({} vs. {})!'.format(repr(user.username), repr(username))) attrs['owner'] = user else: attrs['owner'] = User.objects.get(username=username) return attrs - @classmethod - def from_qcg(cls, qcg_job): - job = cls(**cls.qcg_map(qcg_job)) - job._job = qcg_job - - return job - class Task(models.Model): STATUS_CHOICES = list(enumerate(field for field in dir(TaskStatus) if not field.startswith('__'))) @@ -128,13 +121,13 @@ class Task(models.Model): @property def qcg_task(self): - if self._qcg_task is None: - self._qcg_task = QcgTask(EPRUtils.deserialize_epr(self.epr)) + if self._task is None: + self._task = QcgTask(EPRUtils.deserialize_epr(self.epr)) - return self._qcg_task + return self._task @staticmethod - def qcg_map(qcg_task, jobs=None): + def qcg_map(qcg_task, job=None): attrs = get_attributes(qcg_task, ('task_id', 'status_description', 'note', 'description', 'submission_time', 'start_time', 'finish_time', 'reserved_time_slot', 'purged')) @@ -143,20 +136,15 @@ class Task(models.Model): attrs['type'] = Task.TYPE_CHOICES_REVERSED[qcg_task.type] attrs['proxy_lifetime'] = now() + qcg_task.proxy_lifetime - if jobs is not None and qcg_task.job_id in jobs: - attrs['job'] = jobs[qcg_task.job_id] + if job is not None: + if qcg_task.job_id != job.job_id: + raise ValueError('Job id does not match ({} vs. {})!'.format(repr(qcg_task.job_id), repr(job.job_id))) + attrs['job'] = job else: attrs['job'] = Job.objects.get(job_id=qcg_task.job_id) return attrs - @classmethod - def from_qcg(cls, qcg_task): - task = cls(**cls.qcg_map(qcg_task)) - task._task = qcg_task - - return task - @property def reserved_time_slot(self): if self.reserved_time_start or self.reserved_time_finish: diff --git a/qcg/service.py b/qcg/service.py index 81b4ab4..f950aac 100644 --- a/qcg/service.py +++ b/qcg/service.py @@ -9,6 +9,8 @@ from pyqcg.description import JobDescription from pyqcg.service import Registry from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus +from qcg.models import User, Job, Task, Allocation, NodeInfo + logger = logging.getLogger(__name__) @@ -17,15 +19,14 @@ logger = logging.getLogger(__name__) def update_user_data(user, proxy): ts = time.time() QCG.start() - from qcg.models import User, Job, Task, Allocation, NodeInfo - credential = Credential(proxy) - registry = Registry(credential) + registry = Registry(Credential(proxy)) # put lock on user record (hopefully..?) user = User.objects.select_for_update().get(pk=user.pk) + last_update = user.last_update - changed_filter = {'changed': TimePeriod(after=user.last_update)} + changed_filter = {'changed': TimePeriod(after=last_update)} ################################### # Jobs @@ -54,7 +55,7 @@ def update_user_data(user, proxy): jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)}) task_count = 0 for qcg_task in tasks: - params = Task.qcg_map(qcg_task, jobs_cache) + params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id]) task_id = params.pop('task_id') task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params) @@ -78,8 +79,58 @@ def update_user_data(user, proxy): elapsed_jobs = jte - jts elapsed_tasks = tte - tts elapsed_py = elapsed - elapsed_jobs - elapsed_tasks - logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f', - elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py) + logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f', + elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py) + + +@transaction.atomic +def update_job(job, proxy): + if job.get_status_display() in [JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELED]: + return + + ts = time.time() + QCG.start() + + credential = Credential(proxy) + qcg_job = job.qcg_job + qcg_job.credential = credential + + jts = time.time() + qcg_job.refresh_information() + elapsed_job = time.time() - jts + + for name, val in job.qcg_map(qcg_job, job.owner).iteritems(): + if getattr(job, name) != val: + setattr(job, name, val) + + job.save() + + elapsed_tasks = 0 + for task in job.tasks.all(): + qcg_task = task.qcg_task + qcg_task.credential = credential + + tts = time.time() + qcg_task.refresh_information() + elapsed_tasks += time.time() - tts + + for name, val in task.qcg_map(qcg_task, task.job).iteritems(): + if getattr(task, name) != val: + setattr(task, name, val) + + task.save() + task.allocations.all().delete() + + for qcg_alloc in qcg_task.allocations: + alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc)) + + for qcg_node in qcg_alloc.nodes: + alloc.nodes.create(**NodeInfo.qcg_map(qcg_node)) + + elapsed = time.time() - ts + elapsed_py = elapsed - elapsed_job - elapsed_tasks + logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f', + elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py) def submit_job(params, proxy): diff --git a/qcg/views.py b/qcg/views.py index 09812a4..8905339 100644 --- a/qcg/views.py +++ b/qcg/views.py @@ -21,7 +21,7 @@ from filex.ftp import FTPOperation, FTPError from filex.views import make_url from qcg.forms import FiltersForm, ColumnsForm, JobDescriptionForm, EnvFormSet from qcg.utils import paginator_context -from qcg.service import update_user_data, submit_job +from qcg.service import update_user_data, submit_job, update_job def index(request): @@ -143,6 +143,8 @@ def jobs_list(request): def job_details(request, job_id): job = get_object_or_404(request.user.jobs.prefetch_related('tasks'), job_id=job_id) + update_job(job, request.session['proxy']) + return render(request, 'qcg/job.html', {'job': job}) @@ -151,6 +153,8 @@ def task_details(request, job_id, task_id): task = get_object_or_404(request.user.tasks.select_related('job').prefetch_related('allocations'), job__job_id=job_id, task_id=task_id) + update_job(task.job, request.session['proxy']) + return render(request, 'qcg/task.html', {'task': task}) -- 1.7.9.5