+from datetime import datetime
from Queue import Queue, Empty
+from itertools import chain
+import os
import re
from threading import Event
+from urlparse import urlparse, urlunparse
-from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
+from django.utils.http import urlquote
+from django.utils.timezone import localtime, UTC
-class FTPException(Exception):
- pass
+class FTPError(Exception):
+ def __init__(self, message, verbose=None, *args, **kwargs):
+ super(FTPError, self).__init__(message, *args, **kwargs)
+
+ self.verbose = verbose
class FTPOperation:
- def __init__(self, proxy=None):
- self.end = Event()
+ def __init__(self, proxy=None, buffer_size=4096):
+ from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
+
+ self._end = Event()
+ self._error = None
+ self._buffer = Buffer(buffer_size)
+
self.attr = HandleAttr()
self.op_attr = OperationAttr()
if proxy is not None:
self.op_attr.set_authorization(proxy)
- self.buffer = Buffer(4096)
self.cli = FTPClient(self.attr)
- self.stream = Queue()
- self.error = None
+ # limit size of a queue to 4 MB
+ self.stream = Queue((4 * 2**20) / buffer_size or 1)
def __del__(self):
self.attr.destroy()
self.op_attr.destroy()
self.cli.destroy()
- def report_error(self):
- if self.error is not None:
- match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
-
- msg = match.groups()[0] if match else "Unknown error"
-
- raise FTPException(msg)
-
def _read(self, arg, handle, error, buff, length, offset, eof):
if not error:
self.stream.put(str(buff))
if not eof:
- self.cli.register_read(self.buffer, self._read, None)
+ self.cli.register_read(self._buffer, self._read, None)
+
+ def _write(self, arg, handle, error, buff, length, offset, eof):
+ if eof or error:
+ return
+
+ chunk = self.stream.get()
+
+ if chunk is None:
+ size = 0
+ eof = True
+ else:
+ size = len(chunk)
+ self._buffer.fill(chunk)
+
+ self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
def _done(self, arg, handle, error):
- self.error = error
- self.end.set()
+ self._error = error
+ self._end.set()
+
+ def wait(self):
+ self._end.wait()
+ self._end.clear()
+
+ if self._error is not None:
+ match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
+
+ msg = match.groups()[0] if match else "Unknown error"
+
+ raise FTPError(msg, self._error)
def listing(self, url):
self.cli.verbose_list(url, self._done, None, self.op_attr)
- self.cli.register_read(self.buffer, self._read, None)
-
- self.end.wait()
- self.end.clear()
+ self.cli.register_read(self._buffer, self._read, None)
- self.report_error()
+ self.wait()
result = ''
while not self.stream.empty():
result += self.stream.get()
- return result
+ return self._parse_mlst(result)
+
+ @staticmethod
+ def _parse_mlst(listing):
+ for item in listing.strip().splitlines():
+ # we may receive empty string when there are multiple consecutive newlines in listing
+ if item:
+ attrs, name = item.split(' ', 1)
+
+ attrs_dict = {}
+ for attr in attrs.split(';'):
+ try:
+ key, value = attr.split('=', 1)
+ except ValueError:
+ key, value = attr, ''
+
+ attrs_dict[key] = value
+
+ yield {
+ 'name': name,
+ 'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
+ 'size': int(attrs_dict['Size']),
+ 'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
+ }
def get(self, url):
self.cli.get(url, self._done, None, self.op_attr)
- self.cli.register_read(self.buffer, self._read, None)
+ self.cli.register_read(self._buffer, self._read, None)
while True:
try:
yield self.stream.get(timeout=0.1)
except Empty:
- if self.end.wait(0):
+ if self._end.wait(0):
break
- self.end.clear()
+ self.wait()
+
+ def put(self, url):
+ self.cli.put(url, self._done, None, self.op_attr)
+ self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
+
+ def move(self, src, dst):
+ self.cli.move(src, dst, self._done, None, self.op_attr)
+
+ self.wait()
+
+ def info(self, url):
+ data = self.listing(url).next()
+
+ if data['name'] == '.':
+ data['name'] = os.path.basename(os.path.normpath(url))
+
+ return data
+
+ def exists(self, url):
+ self.cli.exists(url, self._done, None, self.op_attr)
+
+ try:
+ self.wait()
+ except FTPError as e:
+ if 'No such file or directory' in e.message:
+ return False
+ raise
+ else:
+ return True
+
+ def delete(self, url):
+ self.cli.delete(url, self._done, None, self.op_attr)
+
+ self.wait()
+
+ def rmdir(self, url):
+ self.cli.rmdir(url, self._done, None, self.op_attr)
+
+ self.wait()
+
+ def mkdir(self, url, parents=False):
+ if parents:
+ if self.exists(url):
+ return
+
+ u = urlparse(url)
+ parent_url = urlunparse((u.scheme, u.netloc, os.path.dirname(os.path.normpath(u.path)), '', '', ''))
+
+ self.mkdir(parent_url, parents=True)
+
+ self.cli.mkdir(url, self._done, None, self.op_attr)
+
+ self.wait()
+
+ @staticmethod
+ def match_ext(archive, *extensions):
+ for ext in extensions:
+ if archive.endswith(ext):
+ return True
+ return False
+
+ def compress(self, server, path, files, archive):
+ self._check_disk_stack_args(*([path, archive] + files))
+
+ if self.match_ext(archive, '.tar.gz', '.tgz'):
+ cmd, args = 'tar', ['czf', archive, '-C', path] + files
+ elif self.match_ext(archive, '.tar.bz2', '.tbz'):
+ cmd, args = 'tar', ['cjf', archive, '-C', path] + files
+ elif self.match_ext(archive, '.zip'):
+ # zip doesn't support unicode file names
+ for arg in files:
+ try:
+ arg.encode('ascii')
+ except UnicodeEncodeError as e:
+ raise ValueError(u'Unsupported character `{}` in `{}`!'.format(arg[e.start:e.start + 1], arg))
+
+ cmd, args = 'jar', (['cMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
+ else:
+ raise ValueError('Unknown archive type: {}'.format(archive))
+
+ self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + map(urlquote, args)))
+
+ return self.get(server)
+
+ def extract(self, server, archive, dst):
+ self._check_disk_stack_args(*[archive, dst])
+
+ if self.match_ext(archive, '.tar.gz', '.tgz'):
+ cmd, args = 'tar', ('xzf', archive, '-C', dst)
+ elif self.match_ext(archive, '.tar.bz2', '.tbz'):
+ cmd, args = 'tar', ('xjf', archive, '-C', dst)
+ elif self.match_ext(archive, '.zip'):
+ cmd, args = 'unzip', ('-qo', archive, '-d', dst)
+ else:
+ raise ValueError('Unknown archive type: {}'.format(archive))
+
+ self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + map(urlquote, args)))
+
+ return self.get(server)
- self.report_error()
+ @staticmethod
+ def _check_disk_stack_args(*args):
+ for char in ['#', ';']:
+ for arg in args:
+ if char in arg:
+ raise ValueError(u'Unsupported character `{}` in `{}`!'.format(char, arg))