From 8eee3b5df1b14d7c77104e0905e5451f6f4bc5e1 Mon Sep 17 00:00:00 2001 From: Maciej Tronowski Date: Wed, 1 Apr 2015 17:26:42 +0200 Subject: [PATCH] put operation for uploading files --- filex/ftp.py | 69 +++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/filex/ftp.py b/filex/ftp.py index 8c6abde..2d5be20 100644 --- a/filex/ftp.py +++ b/filex/ftp.py @@ -10,52 +10,69 @@ class FTPException(Exception): class FTPOperation: - def __init__(self, proxy=None): - self.end = Event() + def __init__(self, proxy=None, buffer_size=4096): + self._end = Event() + self._error = None + self._buffer = Buffer(buffer_size) + self.attr = HandleAttr() self.op_attr = OperationAttr() if proxy is not None: self.op_attr.set_authorization(proxy) - self.buffer = Buffer(4096) self.cli = FTPClient(self.attr) - self.stream = Queue() - self.error = None + # limit size of a queue to 4 MB + self.stream = Queue((4 * 2**20) / buffer_size or 1) def __del__(self): self.attr.destroy() self.op_attr.destroy() self.cli.destroy() - def report_error(self): - if self.error is not None: - match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE) - - msg = match.groups()[0] if match else "Unknown error" - - raise FTPException(msg) - def _read(self, arg, handle, error, buff, length, offset, eof): if not error: self.stream.put(str(buff)) if not eof: - self.cli.register_read(self.buffer, self._read, None) + self.cli.register_read(self._buffer, self._read, None) + + def _write(self, arg, handle, error, buff, length, offset, eof): + if eof or error: + return + + chunk = self.stream.get() + + if chunk is None: + size = 0 + eof = True + else: + size = len(chunk) + self._buffer.fill(chunk) + + self.cli.register_write(self._buffer, size, offset, eof, self._write, None) def _done(self, arg, handle, error): - self.error = error - self.end.set() + self._error = error + self._end.set() + + def wait(self): + self._end.wait() + self._end.clear() + + if self._error is not None: + match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE) + + msg = match.groups()[0] if match else "Unknown error" + + raise FTPException(msg) def listing(self, url): self.cli.verbose_list(url, self._done, None, self.op_attr) - self.cli.register_read(self.buffer, self._read, None) - - self.end.wait() - self.end.clear() + self.cli.register_read(self._buffer, self._read, None) - self.report_error() + self.wait() result = '' while not self.stream.empty(): @@ -65,15 +82,17 @@ class FTPOperation: def get(self, url): self.cli.get(url, self._done, None, self.op_attr) - self.cli.register_read(self.buffer, self._read, None) + self.cli.register_read(self._buffer, self._read, None) while True: try: yield self.stream.get(timeout=0.1) except Empty: - if self.end.wait(0): + if self._end.wait(0): break - self.end.clear() + self.wait() - self.report_error() + def put(self, url): + self.cli.put(url, self._done, None, self.op_attr) + self.cli.register_write(self._buffer, 0, 0, False, self._write, None) -- 1.7.9.5