1 from datetime import datetime
2 from Queue import Queue, Empty
3 from itertools import chain
6 from threading import Event
7 from urlparse import urlparse, urlunparse
9 from django.utils.http import urlunquote
10 from django.utils.timezone import localtime, UTC
13 class FTPError(Exception):
14 def __init__(self, message, verbose=None, *args, **kwargs):
15 super(FTPError, self).__init__(message, *args, **kwargs)
17 self.verbose = verbose
21 def __init__(self, proxy=None, buffer_size=4096):
22 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
26 self._buffer = Buffer(buffer_size)
28 self.attr = HandleAttr()
29 self.op_attr = OperationAttr()
32 self.op_attr.set_authorization(proxy)
34 self.cli = FTPClient(self.attr)
36 # limit size of a queue to 4 MB
37 self.stream = Queue((4 * 2**20) / buffer_size or 1)
41 self.op_attr.destroy()
44 def _read(self, arg, handle, error, buff, length, offset, eof):
46 self.stream.put(str(buff))
49 self.cli.register_read(self._buffer, self._read, None)
51 def _write(self, arg, handle, error, buff, length, offset, eof):
55 chunk = self.stream.get()
62 self._buffer.fill(chunk)
64 self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
66 def _done(self, arg, handle, error):
74 if self._error is not None:
75 match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
77 msg = match.groups()[0] if match else "Unknown error"
79 raise FTPError(msg, self._error)
81 def listing(self, url):
82 self.cli.verbose_list(url, self._done, None, self.op_attr)
83 self.cli.register_read(self._buffer, self._read, None)
88 while not self.stream.empty():
89 result += self.stream.get()
91 return self._parse_mlst(result)
94 def _parse_mlst(listing):
95 for item in listing.strip().splitlines():
96 # we may receive empty string when there are multiple consecutive newlines in listing
98 attrs, name = item.split(' ', 1)
101 for attr in attrs.split(';'):
103 key, value = attr.split('=', 1)
105 key, value = attr, ''
107 attrs_dict[key] = value
111 'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
112 'size': int(attrs_dict['Size']),
113 'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
117 self.cli.get(url, self._done, None, self.op_attr)
118 self.cli.register_read(self._buffer, self._read, None)
122 yield self.stream.get(timeout=0.1)
124 if self._end.wait(0):
130 self.cli.put(url, self._done, None, self.op_attr)
131 self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
133 def move(self, src, dst):
134 self.cli.move(src, dst, self._done, None, self.op_attr)
139 data = self.listing(url).next()
141 if data['name'] == '.':
142 data['name'] = os.path.basename(os.path.normpath(url))
146 def exists(self, url):
147 self.cli.exists(url, self._done, None, self.op_attr)
151 except FTPError as e:
152 if 'No such file or directory' in e.message:
158 def delete(self, url):
159 self.cli.delete(url, self._done, None, self.op_attr)
163 def rmdir(self, url):
164 self.cli.rmdir(url, self._done, None, self.op_attr)
168 def mkdir(self, url, parents=False):
174 parent_url = urlunparse((u.scheme, u.netloc, os.path.dirname(os.path.normpath(u.path)), '', '', ''))
176 self.mkdir(parent_url, parents=True)
178 self.cli.mkdir(url, self._done, None, self.op_attr)
183 def match_ext(archive, *extensions):
184 for ext in extensions:
185 if archive.endswith(ext):
189 def compress(self, server, path, files, archive):
190 self._check_disk_stack_args(*([path, archive] + files))
192 if self.match_ext(archive, '.tar.gz', '.tgz'):
193 cmd, args = 'tar', ['cvzf', archive, '-C', path] + files
194 elif self.match_ext(archive, '.tar.bz2', '.tbz'):
195 cmd, args = 'tar', ['cvjf', archive, '-C', path] + files
196 elif self.match_ext(archive, '.zip'):
197 cmd, args = 'jar', (['cvMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
199 raise ValueError('Unknown archive type: {}'.format(archive))
201 self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + args))
203 return self.get(server)
205 def extract(self, server, archive, dst):
206 self._check_disk_stack_args(*[archive, dst])
208 if self.match_ext(archive, '.tar.gz', '.tgz'):
209 cmd, args = 'tar', ('xvzf', archive, '-C', dst)
210 elif self.match_ext(archive, '.tar.bz2', '.tbz'):
211 cmd, args = 'tar', ('xvjf', archive, '-C', dst)
212 elif self.match_ext(archive, '.zip'):
213 cmd, args = 'unzip', (archive, '-d', dst)
215 raise ValueError('Unknown archive type: {}'.format(archive))
217 self.op_attr.set_disk_stack('#'.join(("popen:argv=", cmd) + args))
219 return self.get(server)
222 def _check_disk_stack_args(*args):
223 for char in ['#', ',', ';', '%23', '%3B']:
226 raise ValueError('Unsupported character `{}` in `{}`!'.format(urlunquote(char), urlunquote(arg)))