1 from datetime import datetime
2 from Queue import Queue, Empty
4 from threading import Event
6 from django.utils.timezone import localtime, UTC
7 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
10 class FTPException(Exception):
15 def __init__(self, proxy=None, buffer_size=4096):
18 self._buffer = Buffer(buffer_size)
20 self.attr = HandleAttr()
21 self.op_attr = OperationAttr()
24 self.op_attr.set_authorization(proxy)
26 self.cli = FTPClient(self.attr)
28 # limit size of a queue to 4 MB
29 self.stream = Queue((4 * 2**20) / buffer_size or 1)
33 self.op_attr.destroy()
36 def _read(self, arg, handle, error, buff, length, offset, eof):
38 self.stream.put(str(buff))
41 self.cli.register_read(self._buffer, self._read, None)
43 def _write(self, arg, handle, error, buff, length, offset, eof):
47 chunk = self.stream.get()
54 self._buffer.fill(chunk)
56 self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
58 def _done(self, arg, handle, error):
66 if self._error is not None:
67 match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
69 msg = match.groups()[0] if match else "Unknown error"
71 raise FTPException(msg)
73 def listing(self, url):
74 self.cli.verbose_list(url, self._done, None, self.op_attr)
75 self.cli.register_read(self._buffer, self._read, None)
80 while not self.stream.empty():
81 result += self.stream.get()
83 return self._parse_mlst(result)
86 def _parse_mlst(listing):
89 for item in listing.strip().splitlines():
90 # we may receive empty string when there are multiple consecutive newlines in listing
92 attrs, name = item.split(' ', 1)
94 attrs = dict((attr.split('=') for attr in attrs.split(';') if attr))
96 date = localtime(datetime.strptime(attrs['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC()))
100 'type': 'file' if attrs['Type'] == 'file' else 'directory',
101 'size': int(attrs['Size']),
108 self.cli.get(url, self._done, None, self.op_attr)
109 self.cli.register_read(self._buffer, self._read, None)
113 yield self.stream.get(timeout=0.1)
115 if self._end.wait(0):
121 self.cli.put(url, self._done, None, self.op_attr)
122 self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
124 def move(self, src, dst):
125 self.cli.move(src, dst, self._done, None, self.op_attr)