gridftp manager application filex
[qcg-portal.git] / filex / ftp.py
diff --git a/filex/ftp.py b/filex/ftp.py
new file mode 100644 (file)
index 0000000..8c6abde
--- /dev/null
@@ -0,0 +1,79 @@
+from Queue import Queue, Empty
+import re
+from threading import Event
+
+from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
+
+
+class FTPException(Exception):
+    pass
+
+
+class FTPOperation:
+    def __init__(self, proxy=None):
+        self.end = Event()
+        self.attr = HandleAttr()
+        self.op_attr = OperationAttr()
+
+        if proxy is not None:
+            self.op_attr.set_authorization(proxy)
+
+        self.buffer = Buffer(4096)
+        self.cli = FTPClient(self.attr)
+
+        self.stream = Queue()
+        self.error = None
+
+    def __del__(self):
+        self.attr.destroy()
+        self.op_attr.destroy()
+        self.cli.destroy()
+
+    def report_error(self):
+        if self.error is not None:
+            match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
+
+            msg = match.groups()[0] if match else "Unknown error"
+
+            raise FTPException(msg)
+
+    def _read(self, arg, handle, error, buff, length, offset, eof):
+        if not error:
+            self.stream.put(str(buff))
+
+            if not eof:
+                self.cli.register_read(self.buffer, self._read, None)
+
+    def _done(self, arg, handle, error):
+        self.error = error
+        self.end.set()
+
+    def listing(self, url):
+        self.cli.verbose_list(url, self._done, None, self.op_attr)
+        self.cli.register_read(self.buffer, self._read, None)
+
+        self.end.wait()
+        self.end.clear()
+
+        self.report_error()
+
+        result = ''
+        while not self.stream.empty():
+            result += self.stream.get()
+
+        return result
+
+    def get(self, url):
+        self.cli.get(url, self._done, None, self.op_attr)
+        self.cli.register_read(self.buffer, self._read, None)
+
+        while True:
+            try:
+                yield self.stream.get(timeout=0.1)
+            except Empty:
+                if self.end.wait(0):
+                    break
+
+        self.end.clear()
+
+        self.report_error()