put operation for uploading files
authorMaciej Tronowski <mtro@man.poznan.pl>
Wed, 1 Apr 2015 15:26:42 +0000 (17:26 +0200)
committerMaciej Tronowski <mtro@man.poznan.pl>
Wed, 1 Apr 2015 15:26:42 +0000 (17:26 +0200)
filex/ftp.py

index 8c6abde..2d5be20 100644 (file)
@@ -10,52 +10,69 @@ class FTPException(Exception):
 
 
 class FTPOperation:
-    def __init__(self, proxy=None):
-        self.end = Event()
+    def __init__(self, proxy=None, buffer_size=4096):
+        self._end = Event()
+        self._error = None
+        self._buffer = Buffer(buffer_size)
+
         self.attr = HandleAttr()
         self.op_attr = OperationAttr()
 
         if proxy is not None:
             self.op_attr.set_authorization(proxy)
 
-        self.buffer = Buffer(4096)
         self.cli = FTPClient(self.attr)
 
-        self.stream = Queue()
-        self.error = None
+        # limit size of a queue to 4 MB
+        self.stream = Queue((4 * 2**20) / buffer_size or 1)
 
     def __del__(self):
         self.attr.destroy()
         self.op_attr.destroy()
         self.cli.destroy()
 
-    def report_error(self):
-        if self.error is not None:
-            match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
-
-            msg = match.groups()[0] if match else "Unknown error"
-
-            raise FTPException(msg)
-
     def _read(self, arg, handle, error, buff, length, offset, eof):
         if not error:
             self.stream.put(str(buff))
 
             if not eof:
-                self.cli.register_read(self.buffer, self._read, None)
+                self.cli.register_read(self._buffer, self._read, None)
+
+    def _write(self, arg, handle, error, buff, length, offset, eof):
+        if eof or error:
+            return
+
+        chunk = self.stream.get()
+
+        if chunk is None:
+            size = 0
+            eof = True
+        else:
+            size = len(chunk)
+            self._buffer.fill(chunk)
+
+        self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
 
     def _done(self, arg, handle, error):
-        self.error = error
-        self.end.set()
+        self._error = error
+        self._end.set()
+
+    def wait(self):
+        self._end.wait()
+        self._end.clear()
+
+        if self._error is not None:
+            match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
+
+            msg = match.groups()[0] if match else "Unknown error"
+
+            raise FTPException(msg)
 
     def listing(self, url):
         self.cli.verbose_list(url, self._done, None, self.op_attr)
-        self.cli.register_read(self.buffer, self._read, None)
-
-        self.end.wait()
-        self.end.clear()
+        self.cli.register_read(self._buffer, self._read, None)
 
-        self.report_error()
+        self.wait()
 
         result = ''
         while not self.stream.empty():
@@ -65,15 +82,17 @@ class FTPOperation:
 
     def get(self, url):
         self.cli.get(url, self._done, None, self.op_attr)
-        self.cli.register_read(self.buffer, self._read, None)
+        self.cli.register_read(self._buffer, self._read, None)
 
         while True:
             try:
                 yield self.stream.get(timeout=0.1)
             except Empty:
-                if self.end.wait(0):
+                if self._end.wait(0):
                     break
 
-        self.end.clear()
+        self.wait()
 
-        self.report_error()
+    def put(self, url):
+        self.cli.put(url, self._done, None, self.op_attr)
+        self.cli.register_write(self._buffer, 0, 0, False, self._write, None)