username = username_from_dn(qcg_job.user_dn)
if user is not None:
if user.username != username:
- raise ValueError('Username does not match!')
+ raise ValueError('Username does not match ({} vs. {})!'.format(repr(user.username), repr(username)))
attrs['owner'] = user
else:
attrs['owner'] = User.objects.get(username=username)
return attrs
- @classmethod
- def from_qcg(cls, qcg_job):
- job = cls(**cls.qcg_map(qcg_job))
- job._job = qcg_job
-
- return job
-
class Task(models.Model):
STATUS_CHOICES = list(enumerate(field for field in dir(TaskStatus) if not field.startswith('__')))
@property
def qcg_task(self):
- if self._qcg_task is None:
- self._qcg_task = QcgTask(EPRUtils.deserialize_epr(self.epr))
+ if self._task is None:
+ self._task = QcgTask(EPRUtils.deserialize_epr(self.epr))
- return self._qcg_task
+ return self._task
@staticmethod
- def qcg_map(qcg_task, jobs=None):
+ def qcg_map(qcg_task, job=None):
attrs = get_attributes(qcg_task, ('task_id', 'status_description', 'note', 'description', 'submission_time',
'start_time', 'finish_time', 'reserved_time_slot', 'purged'))
attrs['type'] = Task.TYPE_CHOICES_REVERSED[qcg_task.type]
attrs['proxy_lifetime'] = now() + qcg_task.proxy_lifetime
- if jobs is not None and qcg_task.job_id in jobs:
- attrs['job'] = jobs[qcg_task.job_id]
+ if job is not None:
+ if qcg_task.job_id != job.job_id:
+ raise ValueError('Job id does not match ({} vs. {})!'.format(repr(qcg_task.job_id), repr(job.job_id)))
+ attrs['job'] = job
else:
attrs['job'] = Job.objects.get(job_id=qcg_task.job_id)
return attrs
- @classmethod
- def from_qcg(cls, qcg_task):
- task = cls(**cls.qcg_map(qcg_task))
- task._task = qcg_task
-
- return task
-
@property
def reserved_time_slot(self):
if self.reserved_time_start or self.reserved_time_finish:
from pyqcg.service import Registry
from pyqcg.utils import Credential, TimePeriod, JobStatus, TaskStatus
+from qcg.models import User, Job, Task, Allocation, NodeInfo
+
logger = logging.getLogger(__name__)
def update_user_data(user, proxy):
ts = time.time()
QCG.start()
- from qcg.models import User, Job, Task, Allocation, NodeInfo
- credential = Credential(proxy)
- registry = Registry(credential)
+ registry = Registry(Credential(proxy))
# put lock on user record (hopefully..?)
user = User.objects.select_for_update().get(pk=user.pk)
+ last_update = user.last_update
- changed_filter = {'changed': TimePeriod(after=user.last_update)}
+ changed_filter = {'changed': TimePeriod(after=last_update)}
###################################
# Jobs
jobs_cache = SimpleLazyObject(lambda: {j.job_id: j for j in Job.objects.filter(owner=user)})
task_count = 0
for qcg_task in tasks:
- params = Task.qcg_map(qcg_task, jobs_cache)
+ params = Task.qcg_map(qcg_task, jobs_cache[qcg_task.job_id])
task_id = params.pop('task_id')
task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params)
elapsed_jobs = jte - jts
elapsed_tasks = tte - tts
elapsed_py = elapsed - elapsed_jobs - elapsed_tasks
- logger.info('(%.3f) USER = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
- elapsed, user, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
+ logger.info('(%.3f) USER = %s, LAST_UPDATE = %s, JOBS = %s (%.3f), TASKS = %s (%.3f), TIME = %.3f',
+ elapsed, user, last_update, jobs_count, elapsed_jobs, task_count, elapsed_tasks, elapsed_py)
+
+
+@transaction.atomic
+def update_job(job, proxy):
+ if job.get_status_display() in [JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELED]:
+ return
+
+ ts = time.time()
+ QCG.start()
+
+ credential = Credential(proxy)
+ qcg_job = job.qcg_job
+ qcg_job.credential = credential
+
+ jts = time.time()
+ qcg_job.refresh_information()
+ elapsed_job = time.time() - jts
+
+ for name, val in job.qcg_map(qcg_job, job.owner).iteritems():
+ if getattr(job, name) != val:
+ setattr(job, name, val)
+
+ job.save()
+
+ elapsed_tasks = 0
+ for task in job.tasks.all():
+ qcg_task = task.qcg_task
+ qcg_task.credential = credential
+
+ tts = time.time()
+ qcg_task.refresh_information()
+ elapsed_tasks += time.time() - tts
+
+ for name, val in task.qcg_map(qcg_task, task.job).iteritems():
+ if getattr(task, name) != val:
+ setattr(task, name, val)
+
+ task.save()
+ task.allocations.all().delete()
+
+ for qcg_alloc in qcg_task.allocations:
+ alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc))
+
+ for qcg_node in qcg_alloc.nodes:
+ alloc.nodes.create(**NodeInfo.qcg_map(qcg_node))
+
+ elapsed = time.time() - ts
+ elapsed_py = elapsed - elapsed_job - elapsed_tasks
+ logger.info('(%.3f) JOB = %s (%.3f), TASKS = %d (%.3f), TIME = %.3f',
+ elapsed, job.job_id, elapsed_job, job.tasks.count(), elapsed_tasks, elapsed_py)
def submit_job(params, proxy):
from filex.views import make_url
from qcg.forms import FiltersForm, ColumnsForm, JobDescriptionForm, EnvFormSet
from qcg.utils import paginator_context
-from qcg.service import update_user_data, submit_job
+from qcg.service import update_user_data, submit_job, update_job
def index(request):
def job_details(request, job_id):
job = get_object_or_404(request.user.jobs.prefetch_related('tasks'), job_id=job_id)
+ update_job(job, request.session['proxy'])
+
return render(request, 'qcg/job.html', {'job': job})
task = get_object_or_404(request.user.tasks.select_related('job').prefetch_related('allocations'),
job__job_id=job_id, task_id=task_id)
+ update_job(task.job, request.session['proxy'])
+
return render(request, 'qcg/task.html', {'task': task})