X-Git-Url: http://mmka.chem.univ.gda.pl/gitweb/?a=blobdiff_plain;ds=sidebyside;f=qcg%2Futils.py;h=5555e16b2972a9afaf10a9a536e523b1e9a26a06;hb=5d89371016f27ca592862971f0e4c56a32c497bd;hp=24a5a24395877c077796fbe87026c65435a0fd42;hpb=ea9cb5c97dad92d878bb36abcea1ea7fb6e14b24;p=qcg-portal.git diff --git a/qcg/utils.py b/qcg/utils.py index 24a5a24..5555e16 100644 --- a/qcg/utils.py +++ b/qcg/utils.py @@ -1,7 +1,15 @@ +import os +import string +import random + from django.core.paginator import Paginator from django.utils.formats import date_format from django.utils.timezone import localtime +from pyqcg import QCG +from pyqcg.utils import Credential +from pyqcg.description import JobDescription +from filex.ftp import FTPOperation from qcg import constants @@ -37,3 +45,102 @@ def paginator_context(request, objects, per_page=constants.PER_PAGE): def localtime_str(datetime): return date_format(localtime(datetime), 'DATETIME_FORMAT') + + +def random_id(size=8, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + +def chunks(seq, size): + return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) + + +def to_job_desc(params, proxy): + QCG.start() + desc = JobDescription(Credential(proxy)) + + direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs', + 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'notify', + 'preprocess', 'postprocess', 'persistent') + + for name in direct_map: + if params[name]: + setattr(desc, name, params[name]) + + if params['application']: + desc.set_application(*params['application']) + desc.stage_in += [params['master_file']] + desc.arguments.insert(0, os.path.basename(params['master_file'])) + if params['script']: + ftp = FTPOperation(proxy) + + ftp.mkdir(constants.QCG_DATA_URL, parents=True) + url = os.path.join(constants.QCG_DATA_URL, 'script.{}.sh'.format(random_id())) + ftp.put(url) + + for chunk in chunks(params['script'], 4096): + ftp.stream.put(chunk) + ftp.stream.put(None) + ftp.wait() + + desc.executable = url + if params['nodes']: + desc.set_nodes(*params['nodes']) + if params['reservation']: + desc.set_reservation(params['reservation']) + if params['watch_output']: + desc.set_watch_output(params['watch_output'], params['watch_output_pattern']) + # TODO monitoring + + return desc + + +def to_form_data(xml): + # prevent circular import errors + from forms import JobDescriptionForm + + QCG.start() + desc = JobDescription() + desc.xml_description = xml + + direct_map = ('env_variables', 'executable', 'arguments', 'note', 'grant', 'hosts', 'properties', 'queue', 'procs', + 'wall_time', 'memory', 'memory_per_slot', 'modules', 'input', 'stage_in', 'native', 'persistent') + + params = {} + for name in direct_map: + attr = getattr(desc, name) + if isinstance(attr, bool) or attr: + params[name] = attr + + if desc.application is not None: + app_name, app_ver = desc.application + params['application'] = app_name if app_ver is None else app_name + '/' + app_name + stage_in = params['stage_in'] + params['stage_in'], params['master_file'] = stage_in[:-1], stage_in[-1] + params['arguments'] = params['arguments'][1:] + if desc.nodes is not None: + params['nodes'] = ':'.join(map(str, desc.nodes)) + if desc.reservation is not None: + res_id, res_type = desc.reservation + params['reservation'] = res_id + if desc.notify is not None: + params['notify_type'], params['notify_address'] = desc.notify.split(':') + if desc.watch_output is not None: + watch_output, params['watch_output_pattern'] = desc.watch_output + params['watch_output_type'], params['watch_output_address'] = watch_output.split(':') + if desc.preprocess is not None: + if desc.preprocess.startswith('gsiftp://'): + params['preprocess_type'] = JobDescriptionForm.Process.SCRIPT + params['preprocess_script'] = desc.preprocess + else: + params['preprocess_type'] = JobDescriptionForm.Process.CMD + params['preprocess_cmd'] = desc.preprocess + if desc.postprocess is not None: + if desc.postprocess.startswith('gsiftp://'): + params['postprocess_type'] = JobDescriptionForm.Process.SCRIPT + params['postprocess_script'] = desc.postprocess + else: + params['postprocess_type'] = JobDescriptionForm.Process.CMD + params['postprocess_cmd'] = desc.postprocess + + return params