job template model
[qcg-portal.git] / filex / ftp.py
index 8c6abde..3ccd8c6 100644 (file)
+from datetime import datetime
 from Queue import Queue, Empty
+from itertools import chain
+import os
 import re
 from threading import Event
+from urlparse import urlparse, urlunparse
 
-from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
+from django.utils.http import urlunquote
+from django.utils.timezone import localtime, UTC
 
 
-class FTPException(Exception):
-    pass
+class FTPError(Exception):
+    def __init__(self, message, verbose=None, *args, **kwargs):
+        super(FTPError, self).__init__(message, *args, **kwargs)
+
+        self.verbose = verbose
 
 
 class FTPOperation:
-    def __init__(self, proxy=None):
-        self.end = Event()
+    def __init__(self, proxy=None, buffer_size=4096):
+        from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
+
+        self._end = Event()
+        self._error = None
+        self._buffer = Buffer(buffer_size)
+
         self.attr = HandleAttr()
         self.op_attr = OperationAttr()
 
         if proxy is not None:
             self.op_attr.set_authorization(proxy)
 
-        self.buffer = Buffer(4096)
         self.cli = FTPClient(self.attr)
 
-        self.stream = Queue()
-        self.error = None
+        # limit size of a queue to 4 MB
+        self.stream = Queue((4 * 2**20) / buffer_size or 1)
 
     def __del__(self):
         self.attr.destroy()
         self.op_attr.destroy()
         self.cli.destroy()
 
-    def report_error(self):
-        if self.error is not None:
-            match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
-
-            msg = match.groups()[0] if match else "Unknown error"
-
-            raise FTPException(msg)
-
     def _read(self, arg, handle, error, buff, length, offset, eof):
         if not error:
             self.stream.put(str(buff))
 
             if not eof:
-                self.cli.register_read(self.buffer, self._read, None)
+                self.cli.register_read(self._buffer, self._read, None)
+
+    def _write(self, arg, handle, error, buff, length, offset, eof):
+        if eof or error:
+            return
+
+        chunk = self.stream.get()
+
+        if chunk is None:
+            size = 0
+            eof = True
+        else:
+            size = len(chunk)
+            self._buffer.fill(chunk)
+
+        self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
 
     def _done(self, arg, handle, error):
-        self.error = error
-        self.end.set()
+        self._error = error
+        self._end.set()
+
+    def wait(self):
+        self._end.wait()
+        self._end.clear()
+
+        if self._error is not None:
+            match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
+
+            msg = match.groups()[0] if match else "Unknown error"
+
+            raise FTPError(msg, self._error)
 
     def listing(self, url):
         self.cli.verbose_list(url, self._done, None, self.op_attr)
-        self.cli.register_read(self.buffer, self._read, None)
-
-        self.end.wait()
-        self.end.clear()
+        self.cli.register_read(self._buffer, self._read, None)
 
-        self.report_error()
+        self.wait()
 
         result = ''
         while not self.stream.empty():
             result += self.stream.get()
 
-        return result
+        return self._parse_mlst(result)
+
+    @staticmethod
+    def _parse_mlst(listing):
+        for item in listing.strip().splitlines():
+            # we may receive empty string when there are multiple consecutive newlines in listing
+            if item:
+                attrs, name = item.split(' ', 1)
+
+                attrs_dict = {}
+                for attr in attrs.split(';'):
+                    try:
+                        key, value = attr.split('=', 1)
+                    except ValueError:
+                        key, value = attr, ''
+
+                    attrs_dict[key] = value
+
+                yield {
+                    'name': name,
+                    'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
+                    'size': int(attrs_dict['Size']),
+                    'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
+                }
 
     def get(self, url):
         self.cli.get(url, self._done, None, self.op_attr)
-        self.cli.register_read(self.buffer, self._read, None)
+        self.cli.register_read(self._buffer, self._read, None)
 
         while True:
             try:
                 yield self.stream.get(timeout=0.1)
             except Empty:
-                if self.end.wait(0):
+                if self._end.wait(0):
                     break
 
-        self.end.clear()
+        self.wait()
+
+    def put(self, url):
+        self.cli.put(url, self._done, None, self.op_attr)
+        self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
+
+    def move(self, src, dst):
+        self.cli.move(src, dst, self._done, None, self.op_attr)
+
+        self.wait()
+
+    def info(self, url):
+        data = self.listing(url).next()
+
+        if data['name'] == '.':
+            data['name'] = os.path.basename(os.path.normpath(url))
+
+        return data
+
+    def exists(self, url):
+        self.cli.exists(url, self._done, None, self.op_attr)
+
+        try:
+            self.wait()
+        except FTPError as e:
+            if 'No such file or directory' in e.message:
+                return False
+            raise
+        else:
+            return True
+
+    def delete(self, url):
+        self.cli.delete(url, self._done, None, self.op_attr)
+
+        self.wait()
+
+    def rmdir(self, url):
+        self.cli.rmdir(url, self._done, None, self.op_attr)
+
+        self.wait()
+
+    def mkdir(self, url, parents=False):
+        if parents:
+            if self.exists(url):
+                return
+
+            u = urlparse(url)
+            parent_url = urlunparse((u.scheme, u.netloc, os.path.dirname(os.path.normpath(u.path)), '', '', ''))
+
+            self.mkdir(parent_url, parents=True)
+
+        self.cli.mkdir(url, self._done, None, self.op_attr)
+
+        self.wait()
+
+    @staticmethod
+    def match_ext(archive, *extensions):
+        for ext in extensions:
+            if archive.endswith(ext):
+                return True
+        return False
+
+    def compress(self, server, path, files, archive):
+        self._check_disk_stack_args(*([path, archive] + files))
+
+        if self.match_ext(archive, '.tar.gz', '.tgz'):
+            cmd, args = 'tar', ['cvzf', archive, '-C', path] + files
+        elif self.match_ext(archive, '.tar.bz2', '.tbz'):
+            cmd, args = 'tar', ['cvjf', archive, '-C', path] + files
+        elif self.match_ext(archive, '.zip'):
+            cmd, args = 'jar', (['cvMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
+        else:
+            raise ValueError('Unknown archive type: {}'.format(archive))
+
+        self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + args))
+
+        return self.get(server)
+
+    def extract(self, server, archive, dst):
+        self._check_disk_stack_args(*[archive, dst])
+
+        if self.match_ext(archive, '.tar.gz', '.tgz'):
+            cmd, args = 'tar', ('xvzf', archive, '-C', dst)
+        elif self.match_ext(archive, '.tar.bz2', '.tbz'):
+            cmd, args = 'tar', ('xvjf', archive, '-C', dst)
+        elif self.match_ext(archive, '.zip'):
+            cmd, args = 'unzip', (archive, '-d', dst)
+        else:
+            raise ValueError('Unknown archive type: {}'.format(archive))
+
+        self.op_attr.set_disk_stack('#'.join(("popen:argv=", cmd) + args))
+
+        return self.get(server)
 
-        self.report_error()
+    @staticmethod
+    def _check_disk_stack_args(*args):
+        for char in ['#', ',', ';', '%23', '%3B']:
+            for arg in args:
+                if char in arg:
+                    raise ValueError('Unsupported character `{}` in `{}`!'.format(urlunquote(char), urlunquote(arg)))