X-Git-Url: http://mmka.chem.univ.gda.pl/gitweb/?a=blobdiff_plain;f=qcg%2Futils.py;h=57cf23aa683cc60f73f4b4c0d1508b6462e1b334;hb=586209a600ac5767a48c57cae1f566c0d6aaf48a;hp=76dc318538fd2336547bf56d9957e0d912567c3f;hpb=be46fb1b43775f35da8b49ad28110d16c8692777;p=qcg-portal.git diff --git a/qcg/utils.py b/qcg/utils.py index 76dc318..57cf23a 100644 --- a/qcg/utils.py +++ b/qcg/utils.py @@ -1,12 +1,29 @@ +# coding=utf-8 + +from functools import wraps + +import os +import string +import random + +from django.contrib.auth.decorators import login_required +from django.core.cache import caches from django.core.paginator import Paginator -from django.db import transaction -from django.utils.timezone import now -from pyqcg.service import Registry -from pyqcg.utils import Credential, TimePeriod +from django.utils.formats import date_format +from django.utils.timezone import localtime +from django.views.decorators.cache import cache_control +from pyqcg import QCG +from pyqcg.description import JobDescription +from filex.ftp import FTPOperation from qcg import constants +from django.utils import encoding + +resources_cache = caches['resources'] + + def get_attributes(obj, attrs): return {name: getattr(obj, name) for name in attrs if getattr(obj, name) is not None} @@ -17,51 +34,6 @@ def username_from_dn(dn): return username -@transaction.atomic -def update_user_data(user, proxy): - from qcg.models import User, Job, Task, Allocation, NodeInfo - - credential = Credential(proxy) - registry = Registry(credential) - - # put lock on user record (hopefully..?) - user = User.objects.select_for_update().get(pk=user.pk) - - changed_filter = {'changed': TimePeriod(after=user.last_update)} - - ################################### - # Jobs - ################################### - for qcg_job in registry.jobs(**changed_filter): - params = Job.qcg_map(qcg_job, user) - job_id = params.pop('job_id') - - Job.objects.update_or_create(job_id=job_id, defaults=params) - - ################################### - # Tasks - ################################### - jobs_cache = {j.job_id: j for j in Job.objects.filter(owner=user)} - for qcg_task in registry.tasks(**changed_filter): - params = Task.qcg_map(qcg_task, jobs_cache) - task_id = params.pop('task_id') - - task, created = Task.objects.update_or_create(job__job_id=qcg_task.job_id, task_id=task_id, defaults=params) - - if not created: - task.allocations.all().delete() - - for qcg_alloc in qcg_task.allocations: - alloc = task.allocations.create(**Allocation.qcg_map(qcg_alloc)) - - for qcg_node in qcg_alloc.nodes: - alloc.nodes.create(**NodeInfo.qcg_map(qcg_node)) - - # release user lock - user.last_update = now() - user.save() - - def try_parse_int(s, default=None, base=10): try: return int(s, base) @@ -80,3 +52,175 @@ def paginator_context(request, objects, per_page=constants.PER_PAGE): min(max(page_num + 2, 5), paginator.num_pages) + 1) return {'page': paginator.page(page_num), 'num_pages': paginator.num_pages, 'pages_range': pages_range} + + +def localtime_str(datetime): + return date_format(localtime(datetime), 'DATETIME_FORMAT') + + +def random_id(size=8, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + +def chunks(seq, size): + return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) + +def generate_md_inputfile(params): + md_input = list() + # Opis pliku wyjsciowego + opis=params['note'][:80] + md_input.append(encoding.smart_str(opis, encoding='ascii', errors='ignore')) + # Dane kontrolne obliczen + md_input.append('SEED=-3059743 PDBREF ONE_LETTER MD EXTCONF RESCALE_MODE=2') + ctl_data='nstep='+str(params['nstep'])+' ntwe='+str(params['ntwe']) + ctl_data+=' ntwx='+str(params['ntwx'])+' dt='+str(params['dt'])+' damax='+str(params['damax'])+'lang=0 tbf' + md_input.append('{:<79}&'.format(ctl_data)) + md_input.append('tau_bath=1.0 t_bath=300 reset_vel=10000 respa ntime_split=1 maxtime_split=512') + # Paramatry pól siłowych + if params['force_field'] == 'GAB': + # Wagi pola GAB + md_input.append('WLONG=1.35279 WSCP=1.59304 WELEC=0.71534 WBOND=1.00000 WANG=1.13873 &') + md_input.append('WSCLOC=0.16258 WTOR=1.98599 WTORD=1.57069 WCORRH=0.42887 WCORR5=0.00000 &') + md_input.append('WCORR6=0.00000 WEL_LOC=0.16036 WTURN3=1.68722 WTURN4=0.66230 WTURN6=0.00000 &') + md_input.append('WVDWPP=0.11371 WHPB=1.00000 &') + md_input.append('CUTOFF=7.00000 WCORR4=0.00000 WSCCOR=0.0') + else: + # Wagi pola E0LLY + md_input.append('WLONG=1.00000 WSCP=1.23315 WELEC=0.84476 WBOND=1.00000 WANG=0.62954 &') + md_input.append('WSCLOC=0.10554 WTOR=1.84316 WTORD=1.26571 WCORRH=0.19212 WCORR5=0.00000 &') + md_input.append('WCORR6=0.00000 WEL_LOC=0.37357 WTURN3=1.40323 WTURN4=0.64673 WTURN6=0.00000 &') + md_input.append('WVDWPP=0.23173 WHPB=1.00000 WSCCOR=0.0 &') + md_input.append('CUTOFF=7.00000 WCORR4=0.00000') + # Plik PDB + md_input.append(params['pdb_file'].split('/')[-1]) + # Sekwencja aminokwasów + md_input.append(len(params['sequence'])) + seq_str=params['sequence'] + while seq_str: + md_input.append(seq_str[:80]) + seq_str=seq_str[80:] + md_input.append(' 0') + md_input.append(' 0') + + return md_input + + +def to_job_desc(params, proxy): + QCG.start() + desc = JobDescription() + + direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs', + 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify', + 'preprocess', 'postprocess', 'persistent') + + for name in direct_map: + if params[name]: + setattr(desc, name, params[name]) + + if params['application']: + desc.set_application(*params['application']) + desc.stage_in += [params['master_file']] + desc.arguments.insert(0, os.path.basename(params['master_file'])) + if params['script']: + ftp = FTPOperation(proxy) + + ftp.mkdir(constants.QCG_DATA_URL, parents=True) + url = os.path.join(constants.QCG_DATA_URL, 'script.{}.sh'.format(random_id())) + ftp.put(url) + + for chunk in chunks(params['script'], 4096): + ftp.stream.put(chunk) + ftp.stream.put(None) + ftp.wait() + + desc.executable = url + if params['nodes']: + desc.set_nodes(*params['nodes']) + if params['reservation']: + desc.set_reservation(params['reservation']) + if params['watch_output']: + desc.set_watch_output(params['watch_output'], params['watch_output_pattern']) + + # TODO monitoring + + return desc + + +def to_form_data(xml): + # prevent circular import errors + from forms import JobDescriptionForm + + QCG.start() + desc = JobDescription() + desc.xml_description = xml + + direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs', + 'wall_time', 'modules', 'input', 'stage_in', 'native', 'persistent') + + params = {} + for name in direct_map: + attr = getattr(desc, name) + if isinstance(attr, bool) or attr: + params[name] = attr + + if desc.application is not None: + app_name, app_ver = desc.application + params['application'] = app_name if app_ver is None else app_name + '/' + app_name + stage_in = params['stage_in'] + params['stage_in'], params['master_file'] = stage_in[:-1], stage_in[-1] + params['arguments'] = params['arguments'][1:] + if desc.nodes is not None: + params['nodes'] = ':'.join(map(str, desc.nodes)) + if desc.reservation is not None: + res_id, res_type = desc.reservation + params['reservation'] = res_id + if desc.notify is not None: + params['notify_type'], params['notify_address'] = desc.notify.split(':') + if desc.watch_output is not None: + watch_output, params['watch_output_pattern'] = desc.watch_output + params['watch_output_type'], params['watch_output_address'] = watch_output.split(':') + if desc.preprocess is not None: + if desc.preprocess.startswith('gsiftp://'): + params['preprocess_type'] = JobDescriptionForm.Process.SCRIPT + params['preprocess_script'] = desc.preprocess + else: + params['preprocess_type'] = JobDescriptionForm.Process.CMD + params['preprocess_cmd'] = desc.preprocess + if desc.postprocess is not None: + if desc.postprocess.startswith('gsiftp://'): + params['postprocess_type'] = JobDescriptionForm.Process.SCRIPT + params['postprocess_script'] = desc.postprocess + else: + params['postprocess_type'] = JobDescriptionForm.Process.CMD + params['postprocess_cmd'] = desc.postprocess + if desc.memory: + params['memory'] = int(desc.memory) + if desc.memory_per_slot: + params['memory_per_slot'] = int(desc.memory_per_slot) + + return params + + +def restricted(view): + return wraps(view)(cache_control(no_cache=True, must_revalidate=True, no_store=True)(login_required(view))) + + +def cached_resources(proxy): + hosts = resources_cache.get('hosts') + if hosts is None: + # prevent circular import errors + from qcg.service import fetch_resources + + hosts, _, applications, modules = map(make_choices, fetch_resources(proxy)) + resources_cache.set('hosts', hosts) + resources_cache.set('applications', applications) + resources_cache.set('modules', modules) + else: + applications = resources_cache.get('applications') + modules = resources_cache.get('modules') + + return hosts, applications, modules + + +def make_choices(iterable): + return ((None, ''),) + tuple((item, item) for item in sorted(iterable))