Merge branch 'devel'
[qcg-portal.git] / filex / ftp.py
1 from datetime import datetime
2 from Queue import Queue, Empty
3 from itertools import chain
4 import os
5 import re
6 from threading import Event
7 from urlparse import urlparse, urlunparse
8
9 from django.utils.http import urlquote
10 from django.utils.timezone import localtime, UTC
11
12
13 class FTPError(Exception):
14     def __init__(self, message, verbose=None, *args, **kwargs):
15         super(FTPError, self).__init__(message, *args, **kwargs)
16
17         self.verbose = verbose
18
19
20 class FTPOperation:
21     def __init__(self, proxy=None, buffer_size=4096):
22         from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
23
24         self._end = Event()
25         self._error = None
26         self._buffer = Buffer(buffer_size)
27
28         self.attr = HandleAttr()
29         self.op_attr = OperationAttr()
30
31         if proxy is not None:
32             self.op_attr.set_authorization(proxy)
33
34         self.cli = FTPClient(self.attr)
35
36         # limit size of a queue to 4 MB
37         self.stream = Queue((4 * 2**20) / buffer_size or 1)
38
39     def __del__(self):
40         self.attr.destroy()
41         self.op_attr.destroy()
42         self.cli.destroy()
43
44     def _read(self, arg, handle, error, buff, length, offset, eof):
45         if not error:
46             self.stream.put(str(buff))
47
48             if not eof:
49                 self.cli.register_read(self._buffer, self._read, None)
50
51     def _write(self, arg, handle, error, buff, length, offset, eof):
52         if eof or error:
53             return
54
55         chunk = self.stream.get()
56
57         if chunk is None:
58             size = 0
59             eof = True
60         else:
61             size = len(chunk)
62             self._buffer.fill(chunk)
63
64         self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
65
66     def _done(self, arg, handle, error):
67         self._error = error
68         self._end.set()
69
70     def wait(self):
71         self._end.wait()
72         self._end.clear()
73
74         if self._error is not None:
75             match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
76
77             msg = match.groups()[0] if match else "Unknown error"
78
79             raise FTPError(msg, self._error)
80
81     def listing(self, url):
82         self.cli.verbose_list(url, self._done, None, self.op_attr)
83         self.cli.register_read(self._buffer, self._read, None)
84
85         self.wait()
86
87         result = ''
88         while not self.stream.empty():
89             result += self.stream.get()
90
91         return self._parse_mlst(result)
92
93     @staticmethod
94     def _parse_mlst(listing):
95         for item in listing.strip().splitlines():
96             # we may receive empty string when there are multiple consecutive newlines in listing
97             if item:
98                 attrs, name = item.split(' ', 1)
99
100                 attrs_dict = {}
101                 for attr in attrs.split(';'):
102                     try:
103                         key, value = attr.split('=', 1)
104                     except ValueError:
105                         key, value = attr, ''
106
107                     attrs_dict[key] = value
108
109                 yield {
110                     'name': name,
111                     'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
112                     'size': int(attrs_dict['Size']),
113                     'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
114                 }
115
116     def get(self, url):
117         self.cli.get(url, self._done, None, self.op_attr)
118         self.cli.register_read(self._buffer, self._read, None)
119
120         while True:
121             try:
122                 yield self.stream.get(timeout=0.1)
123             except Empty:
124                 if self._end.wait(0):
125                     break
126
127         self.wait()
128
129     def put(self, url):
130         self.cli.put(url, self._done, None, self.op_attr)
131         self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
132
133     def move(self, src, dst):
134         self.cli.move(src, dst, self._done, None, self.op_attr)
135
136         self.wait()
137
138     def info(self, url):
139         data = self.listing(url).next()
140
141         if data['name'] == '.':
142             data['name'] = os.path.basename(os.path.normpath(url))
143
144         return data
145
146     def exists(self, url):
147         self.cli.exists(url, self._done, None, self.op_attr)
148
149         try:
150             self.wait()
151         except FTPError as e:
152             if 'No such file or directory' in e.message:
153                 return False
154             raise
155         else:
156             return True
157
158     def delete(self, url):
159         self.cli.delete(url, self._done, None, self.op_attr)
160
161         self.wait()
162
163     def rmdir(self, url):
164         self.cli.rmdir(url, self._done, None, self.op_attr)
165
166         self.wait()
167
168     def mkdir(self, url, parents=False):
169         if parents:
170             if self.exists(url):
171                 return
172
173             u = urlparse(url)
174             parent_url = urlunparse((u.scheme, u.netloc, os.path.dirname(os.path.normpath(u.path)), '', '', ''))
175
176             self.mkdir(parent_url, parents=True)
177
178         self.cli.mkdir(url, self._done, None, self.op_attr)
179
180         self.wait()
181
182     @staticmethod
183     def match_ext(archive, *extensions):
184         for ext in extensions:
185             if archive.endswith(ext):
186                 return True
187         return False
188
189     def compress(self, server, path, files, archive):
190         self._check_disk_stack_args(*([path, archive] + files))
191
192         if self.match_ext(archive, '.tar.gz', '.tgz'):
193             cmd, args = 'tar', ['czf', archive, '-C', path] + files
194         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
195             cmd, args = 'tar', ['cjf', archive, '-C', path] + files
196         elif self.match_ext(archive, '.zip'):
197             # zip doesn't support unicode file names
198             for arg in files:
199                 try:
200                     arg.encode('ascii')
201                 except UnicodeEncodeError as e:
202                     raise ValueError(u'Unsupported character `{}` in `{}`!'.format(arg[e.start:e.start + 1], arg))
203
204             cmd, args = 'jar', (['cMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
205         else:
206             raise ValueError('Unknown archive type: {}'.format(archive))
207
208         self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + map(urlquote, args)))
209
210         return self.get(server)
211
212     def extract(self, server, archive, dst):
213         self._check_disk_stack_args(*[archive, dst])
214
215         if self.match_ext(archive, '.tar.gz', '.tgz'):
216             cmd, args = 'tar', ('xzf', archive, '-C', dst)
217         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
218             cmd, args = 'tar', ('xjf', archive, '-C', dst)
219         elif self.match_ext(archive, '.zip'):
220             cmd, args = 'unzip', ('-qo', archive, '-d', dst)
221         else:
222             raise ValueError('Unknown archive type: {}'.format(archive))
223
224         self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + map(urlquote, args)))
225
226         return self.get(server)
227
228     @staticmethod
229     def _check_disk_stack_args(*args):
230         for char in ['#', ';']:
231             for arg in args:
232                 if char in arg:
233                     raise ValueError(u'Unsupported character `{}` in `{}`!'.format(char, arg))