1 from datetime import datetime
2 from Queue import Queue, Empty
5 from threading import Event
6 from urlparse import urlparse
8 from django.utils.timezone import localtime, UTC
9 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
12 class FTPException(Exception):
17 def __init__(self, proxy=None, buffer_size=4096):
20 self._buffer = Buffer(buffer_size)
22 self.attr = HandleAttr()
23 self.op_attr = OperationAttr()
26 self.op_attr.set_authorization(proxy)
28 self.cli = FTPClient(self.attr)
30 # limit size of a queue to 4 MB
31 self.stream = Queue((4 * 2**20) / buffer_size or 1)
35 self.op_attr.destroy()
38 def _read(self, arg, handle, error, buff, length, offset, eof):
40 self.stream.put(str(buff))
43 self.cli.register_read(self._buffer, self._read, None)
45 def _write(self, arg, handle, error, buff, length, offset, eof):
49 chunk = self.stream.get()
56 self._buffer.fill(chunk)
58 self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
60 def _done(self, arg, handle, error):
68 if self._error is not None:
69 match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
71 msg = match.groups()[0] if match else "Unknown error"
73 raise FTPException(msg)
75 def listing(self, url):
76 self.cli.verbose_list(url, self._done, None, self.op_attr)
77 self.cli.register_read(self._buffer, self._read, None)
82 while not self.stream.empty():
83 result += self.stream.get()
85 return self._parse_mlst(result)
88 def _parse_mlst(listing):
89 for item in listing.strip().splitlines():
90 # we may receive empty string when there are multiple consecutive newlines in listing
92 attrs, name = item.split(' ', 1)
95 for attr in attrs.split(';'):
97 key, value = attr.split('=', 1)
98 attrs_dict[key] = value
104 'type': 'directory' if attrs_dict['Type'] == 'dir' else 'file',
105 'size': int(attrs_dict['Size']),
106 'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
110 self.cli.get(url, self._done, None, self.op_attr)
111 self.cli.register_read(self._buffer, self._read, None)
115 yield self.stream.get(timeout=0.1)
117 if self._end.wait(0):
123 self.cli.put(url, self._done, None, self.op_attr)
124 self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
126 def move(self, src, dst):
127 self.cli.move(src, dst, self._done, None, self.op_attr)
132 data = self.listing(url).next()
134 if data['name'] == '.':
135 data['name'] = os.path.basename(urlparse(url).path.rstrip('/')) or u'/'
139 def delete(self, url):
140 self.cli.delete(url, self._done, None, self.op_attr)
144 def rmdir(self, url):
145 self.cli.rmdir(url, self._done, None, self.op_attr)