--- /dev/null
+from Queue import Queue, Empty
+import re
+from threading import Event
+
+from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
+
+
+class FTPException(Exception):
+ pass
+
+
+class FTPOperation:
+ def __init__(self, proxy=None):
+ self.end = Event()
+ self.attr = HandleAttr()
+ self.op_attr = OperationAttr()
+
+ if proxy is not None:
+ self.op_attr.set_authorization(proxy)
+
+ self.buffer = Buffer(4096)
+ self.cli = FTPClient(self.attr)
+
+ self.stream = Queue()
+ self.error = None
+
+ def __del__(self):
+ self.attr.destroy()
+ self.op_attr.destroy()
+ self.cli.destroy()
+
+ def report_error(self):
+ if self.error is not None:
+ match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
+
+ msg = match.groups()[0] if match else "Unknown error"
+
+ raise FTPException(msg)
+
+ def _read(self, arg, handle, error, buff, length, offset, eof):
+ if not error:
+ self.stream.put(str(buff))
+
+ if not eof:
+ self.cli.register_read(self.buffer, self._read, None)
+
+ def _done(self, arg, handle, error):
+ self.error = error
+ self.end.set()
+
+ def listing(self, url):
+ self.cli.verbose_list(url, self._done, None, self.op_attr)
+ self.cli.register_read(self.buffer, self._read, None)
+
+ self.end.wait()
+ self.end.clear()
+
+ self.report_error()
+
+ result = ''
+ while not self.stream.empty():
+ result += self.stream.get()
+
+ return result
+
+ def get(self, url):
+ self.cli.get(url, self._done, None, self.op_attr)
+ self.cli.register_read(self.buffer, self._read, None)
+
+ while True:
+ try:
+ yield self.stream.get(timeout=0.1)
+ except Empty:
+ if self.end.wait(0):
+ break
+
+ self.end.clear()
+
+ self.report_error()