1 from datetime import datetime
2 from Queue import Queue, Empty
3 from itertools import chain
6 from threading import Event
8 from django.utils.http import urlunquote
9 from django.utils.timezone import localtime, UTC
12 class FTPError(Exception):
13 def __init__(self, message, verbose=None, *args, **kwargs):
14 super(FTPError, self).__init__(message, *args, **kwargs)
16 self.verbose = verbose
20 def __init__(self, proxy=None, buffer_size=4096):
21 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
25 self._buffer = Buffer(buffer_size)
27 self.attr = HandleAttr()
28 self.op_attr = OperationAttr()
31 self.op_attr.set_authorization(proxy)
33 self.cli = FTPClient(self.attr)
35 # limit size of a queue to 4 MB
36 self.stream = Queue((4 * 2**20) / buffer_size or 1)
40 self.op_attr.destroy()
43 def _read(self, arg, handle, error, buff, length, offset, eof):
45 self.stream.put(str(buff))
48 self.cli.register_read(self._buffer, self._read, None)
50 def _write(self, arg, handle, error, buff, length, offset, eof):
54 chunk = self.stream.get()
61 self._buffer.fill(chunk)
63 self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
65 def _done(self, arg, handle, error):
73 if self._error is not None:
74 match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
76 msg = match.groups()[0] if match else "Unknown error"
78 raise FTPError(msg, self._error)
80 def listing(self, url):
81 self.cli.verbose_list(url, self._done, None, self.op_attr)
82 self.cli.register_read(self._buffer, self._read, None)
87 while not self.stream.empty():
88 result += self.stream.get()
90 return self._parse_mlst(result)
93 def _parse_mlst(listing):
94 for item in listing.strip().splitlines():
95 # we may receive empty string when there are multiple consecutive newlines in listing
97 attrs, name = item.split(' ', 1)
100 for attr in attrs.split(';'):
102 key, value = attr.split('=', 1)
104 key, value = attr, ''
106 attrs_dict[key] = value
110 'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
111 'size': int(attrs_dict['Size']),
112 'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
116 self.cli.get(url, self._done, None, self.op_attr)
117 self.cli.register_read(self._buffer, self._read, None)
121 yield self.stream.get(timeout=0.1)
123 if self._end.wait(0):
129 self.cli.put(url, self._done, None, self.op_attr)
130 self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
132 def move(self, src, dst):
133 self.cli.move(src, dst, self._done, None, self.op_attr)
138 data = self.listing(url).next()
140 if data['name'] == '.':
141 data['name'] = os.path.basename(os.path.normpath(url))
145 def delete(self, url):
146 self.cli.delete(url, self._done, None, self.op_attr)
150 def rmdir(self, url):
151 self.cli.rmdir(url, self._done, None, self.op_attr)
155 def mkdir(self, url):
156 self.cli.mkdir(url, self._done, None, self.op_attr)
161 def match_ext(archive, *extensions):
162 for ext in extensions:
163 if archive.endswith(ext):
167 def compress(self, server, path, files, archive):
168 self._check_disk_stack_args(*([path, archive] + files))
170 if self.match_ext(archive, '.tar.gz', '.tgz'):
171 cmd, args = 'tar', ['cvzf', archive, '-C', path] + files
172 elif self.match_ext(archive, '.tar.bz2', '.tbz'):
173 cmd, args = 'tar', ['cvjf', archive, '-C', path] + files
174 elif self.match_ext(archive, '.zip'):
175 cmd, args = 'jar', (['cvMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
177 raise ValueError('Unknown archive type: {}'.format(archive))
179 self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + args))
181 return self.get(server)
183 def extract(self, server, archive, dst):
184 self._check_disk_stack_args(*[archive, dst])
186 if self.match_ext(archive, '.tar.gz', '.tgz'):
187 cmd, args = 'tar', ('xvzf', archive, '-C', dst)
188 elif self.match_ext(archive, '.tar.bz2', '.tbz'):
189 cmd, args = 'tar', ('xvjf', archive, '-C', dst)
190 elif self.match_ext(archive, '.zip'):
191 cmd, args = 'unzip', (archive, '-d', dst)
193 raise ValueError('Unknown archive type: {}'.format(archive))
195 self.op_attr.set_disk_stack('#'.join(("popen:argv=", cmd) + args))
197 return self.get(server)
200 def _check_disk_stack_args(*args):
201 for char in ['#', ',', ';', '%23', '%3B']:
204 raise ValueError('Unsupported character `{}` in `{}`!'.format(urlunquote(char), urlunquote(arg)))