+
+ def info(self, url):
+ data = self.listing(url).next()
+
+ if data['name'] == '.':
+ data['name'] = os.path.basename(os.path.normpath(url))
+
+ return data
+
+ def exists(self, url):
+ self.cli.exists(url, self._done, None, self.op_attr)
+
+ try:
+ self.wait()
+ except FTPError as e:
+ if 'No such file or directory' in e.message:
+ return False
+ raise
+ else:
+ return True
+
+ def delete(self, url):
+ self.cli.delete(url, self._done, None, self.op_attr)
+
+ self.wait()
+
+ def rmdir(self, url):
+ self.cli.rmdir(url, self._done, None, self.op_attr)
+
+ self.wait()
+
+ def mkdir(self, url, parents=False):
+ if parents:
+ if self.exists(url):
+ return
+
+ u = urlparse(url)
+ parent_url = urlunparse((u.scheme, u.netloc, os.path.dirname(os.path.normpath(u.path)), '', '', ''))
+
+ self.mkdir(parent_url, parents=True)
+
+ self.cli.mkdir(url, self._done, None, self.op_attr)
+
+ self.wait()
+
+ @staticmethod
+ def match_ext(archive, *extensions):
+ for ext in extensions:
+ if archive.endswith(ext):
+ return True
+ return False
+
+ def compress(self, server, path, files, archive):
+ self._check_disk_stack_args(*([path, archive] + files))
+
+ if self.match_ext(archive, '.tar.gz', '.tgz'):
+ cmd, args = 'tar', ['czf', archive, '-C', path] + files
+ elif self.match_ext(archive, '.tar.bz2', '.tbz'):
+ cmd, args = 'tar', ['cjf', archive, '-C', path] + files
+ elif self.match_ext(archive, '.zip'):
+ # zip doesn't support unicode file names
+ for arg in files:
+ try:
+ arg.encode('ascii')
+ except UnicodeEncodeError as e:
+ raise ValueError(u'Unsupported character `{}` in `{}`!'.format(arg[e.start:e.start + 1], arg))
+
+ cmd, args = 'jar', (['cMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
+ else:
+ raise ValueError('Unknown archive type: {}'.format(archive))
+
+ self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + map(urlquote, args)))
+
+ return self.get(server)
+
+ def extract(self, server, archive, dst):
+ self._check_disk_stack_args(*[archive, dst])
+
+ if self.match_ext(archive, '.tar.gz', '.tgz'):
+ cmd, args = 'tar', ('xzf', archive, '-C', dst)
+ elif self.match_ext(archive, '.tar.bz2', '.tbz'):
+ cmd, args = 'tar', ('xjf', archive, '-C', dst)
+ elif self.match_ext(archive, '.zip'):
+ cmd, args = 'unzip', ('-qo', archive, '-d', dst)
+ else:
+ raise ValueError('Unknown archive type: {}'.format(archive))
+
+ self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + map(urlquote, args)))
+
+ return self.get(server)
+
+ @staticmethod
+ def _check_disk_stack_args(*args):
+ for char in ['#', ';']:
+ for arg in args:
+ if char in arg:
+ raise ValueError(u'Unsupported character `{}` in `{}`!'.format(char, arg))