class FTPOperation:
- def __init__(self, proxy=None):
- self.end = Event()
+ def __init__(self, proxy=None, buffer_size=4096):
+ self._end = Event()
+ self._error = None
+ self._buffer = Buffer(buffer_size)
+
self.attr = HandleAttr()
self.op_attr = OperationAttr()
if proxy is not None:
self.op_attr.set_authorization(proxy)
- self.buffer = Buffer(4096)
self.cli = FTPClient(self.attr)
- self.stream = Queue()
- self.error = None
+ # limit size of a queue to 4 MB
+ self.stream = Queue((4 * 2**20) / buffer_size or 1)
def __del__(self):
self.attr.destroy()
self.op_attr.destroy()
self.cli.destroy()
- def report_error(self):
- if self.error is not None:
- match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
-
- msg = match.groups()[0] if match else "Unknown error"
-
- raise FTPException(msg)
-
def _read(self, arg, handle, error, buff, length, offset, eof):
if not error:
self.stream.put(str(buff))
if not eof:
- self.cli.register_read(self.buffer, self._read, None)
+ self.cli.register_read(self._buffer, self._read, None)
+
+ def _write(self, arg, handle, error, buff, length, offset, eof):
+ if eof or error:
+ return
+
+ chunk = self.stream.get()
+
+ if chunk is None:
+ size = 0
+ eof = True
+ else:
+ size = len(chunk)
+ self._buffer.fill(chunk)
+
+ self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
def _done(self, arg, handle, error):
- self.error = error
- self.end.set()
+ self._error = error
+ self._end.set()
+
+ def wait(self):
+ self._end.wait()
+ self._end.clear()
+
+ if self._error is not None:
+ match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
+
+ msg = match.groups()[0] if match else "Unknown error"
+
+ raise FTPException(msg)
def listing(self, url):
self.cli.verbose_list(url, self._done, None, self.op_attr)
- self.cli.register_read(self.buffer, self._read, None)
-
- self.end.wait()
- self.end.clear()
+ self.cli.register_read(self._buffer, self._read, None)
- self.report_error()
+ self.wait()
result = ''
while not self.stream.empty():
def get(self, url):
self.cli.get(url, self._done, None, self.op_attr)
- self.cli.register_read(self.buffer, self._read, None)
+ self.cli.register_read(self._buffer, self._read, None)
while True:
try:
yield self.stream.get(timeout=0.1)
except Empty:
- if self.end.wait(0):
+ if self._end.wait(0):
break
- self.end.clear()
+ self.wait()
- self.report_error()
+ def put(self, url):
+ self.cli.put(url, self._done, None, self.op_attr)
+ self.cli.register_write(self._buffer, 0, 0, False, self._write, None)