1 from Queue import Queue, Empty
3 from threading import Event
5 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
8 class FTPException(Exception):
13 def __init__(self, proxy=None, buffer_size=4096):
16 self._buffer = Buffer(buffer_size)
18 self.attr = HandleAttr()
19 self.op_attr = OperationAttr()
22 self.op_attr.set_authorization(proxy)
24 self.cli = FTPClient(self.attr)
26 # limit size of a queue to 4 MB
27 self.stream = Queue((4 * 2**20) / buffer_size or 1)
31 self.op_attr.destroy()
34 def _read(self, arg, handle, error, buff, length, offset, eof):
36 self.stream.put(str(buff))
39 self.cli.register_read(self._buffer, self._read, None)
41 def _write(self, arg, handle, error, buff, length, offset, eof):
45 chunk = self.stream.get()
52 self._buffer.fill(chunk)
54 self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
56 def _done(self, arg, handle, error):
64 if self._error is not None:
65 match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
67 msg = match.groups()[0] if match else "Unknown error"
69 raise FTPException(msg)
71 def listing(self, url):
72 self.cli.verbose_list(url, self._done, None, self.op_attr)
73 self.cli.register_read(self._buffer, self._read, None)
78 while not self.stream.empty():
79 result += self.stream.get()
84 self.cli.get(url, self._done, None, self.op_attr)
85 self.cli.register_read(self._buffer, self._read, None)
89 yield self.stream.get(timeout=0.1)
97 self.cli.put(url, self._done, None, self.op_attr)
98 self.cli.register_write(self._buffer, 0, 0, False, self._write, None)