af06c277e07152a463bccf06d78dc57e1726d08c
[qcg-portal.git] / filex / ftp.py
1 from datetime import datetime
2 from Queue import Queue, Empty
3 from itertools import chain
4 import os
5 import re
6 from threading import Event
7
8 from django.utils.http import urlunquote
9 from django.utils.timezone import localtime, UTC
10
11
12 class FTPError(Exception):
13     def __init__(self, message, verbose=None, *args, **kwargs):
14         super(FTPError, self).__init__(message, *args, **kwargs)
15
16         self.verbose = verbose
17
18
19 class FTPOperation:
20     def __init__(self, proxy=None, buffer_size=4096):
21         from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
22
23         self._end = Event()
24         self._error = None
25         self._buffer = Buffer(buffer_size)
26
27         self.attr = HandleAttr()
28         self.op_attr = OperationAttr()
29
30         if proxy is not None:
31             self.op_attr.set_authorization(proxy)
32
33         self.cli = FTPClient(self.attr)
34
35         # limit size of a queue to 4 MB
36         self.stream = Queue((4 * 2**20) / buffer_size or 1)
37
38     def __del__(self):
39         self.attr.destroy()
40         self.op_attr.destroy()
41         self.cli.destroy()
42
43     def _read(self, arg, handle, error, buff, length, offset, eof):
44         if not error:
45             self.stream.put(str(buff))
46
47             if not eof:
48                 self.cli.register_read(self._buffer, self._read, None)
49
50     def _write(self, arg, handle, error, buff, length, offset, eof):
51         if eof or error:
52             return
53
54         chunk = self.stream.get()
55
56         if chunk is None:
57             size = 0
58             eof = True
59         else:
60             size = len(chunk)
61             self._buffer.fill(chunk)
62
63         self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
64
65     def _done(self, arg, handle, error):
66         self._error = error
67         self._end.set()
68
69     def wait(self):
70         self._end.wait()
71         self._end.clear()
72
73         if self._error is not None:
74             match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
75
76             msg = match.groups()[0] if match else "Unknown error"
77
78             raise FTPError(msg, self._error)
79
80     def listing(self, url):
81         self.cli.verbose_list(url, self._done, None, self.op_attr)
82         self.cli.register_read(self._buffer, self._read, None)
83
84         self.wait()
85
86         result = ''
87         while not self.stream.empty():
88             result += self.stream.get()
89
90         return self._parse_mlst(result)
91
92     @staticmethod
93     def _parse_mlst(listing):
94         for item in listing.strip().splitlines():
95             # we may receive empty string when there are multiple consecutive newlines in listing
96             if item:
97                 attrs, name = item.split(' ', 1)
98
99                 attrs_dict = {}
100                 for attr in attrs.split(';'):
101                     try:
102                         key, value = attr.split('=', 1)
103                     except ValueError:
104                         key, value = attr, ''
105
106                     attrs_dict[key] = value
107
108                 yield {
109                     'name': name,
110                     'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
111                     'size': int(attrs_dict['Size']),
112                     'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
113                 }
114
115     def get(self, url):
116         self.cli.get(url, self._done, None, self.op_attr)
117         self.cli.register_read(self._buffer, self._read, None)
118
119         while True:
120             try:
121                 yield self.stream.get(timeout=0.1)
122             except Empty:
123                 if self._end.wait(0):
124                     break
125
126         self.wait()
127
128     def put(self, url):
129         self.cli.put(url, self._done, None, self.op_attr)
130         self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
131
132     def move(self, src, dst):
133         self.cli.move(src, dst, self._done, None, self.op_attr)
134
135         self.wait()
136
137     def info(self, url):
138         data = self.listing(url).next()
139
140         if data['name'] == '.':
141             data['name'] = os.path.basename(os.path.normpath(url))
142
143         return data
144
145     def delete(self, url):
146         self.cli.delete(url, self._done, None, self.op_attr)
147
148         self.wait()
149
150     def rmdir(self, url):
151         self.cli.rmdir(url, self._done, None, self.op_attr)
152
153         self.wait()
154
155     def mkdir(self, url):
156         self.cli.mkdir(url, self._done, None, self.op_attr)
157
158         self.wait()
159
160     @staticmethod
161     def match_ext(archive, *extensions):
162         for ext in extensions:
163             if archive.endswith(ext):
164                 return True
165         return False
166
167     def compress(self, server, path, files, archive):
168         self._check_disk_stack_args(*([path, archive] + files))
169
170         if self.match_ext(archive, '.tar.gz', '.tgz'):
171             cmd, args = 'tar', ['cvzf', archive, '-C', path] + files
172         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
173             cmd, args = 'tar', ['cvjf', archive, '-C', path] + files
174         elif self.match_ext(archive, '.zip'):
175             cmd, args = 'jar', (['cvMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
176         else:
177             raise ValueError('Unknown archive type: {}'.format(archive))
178
179         self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + args))
180
181         return self.get(server)
182
183     def extract(self, server, archive, dst):
184         self._check_disk_stack_args(*[archive, dst])
185
186         if self.match_ext(archive, '.tar.gz', '.tgz'):
187             cmd, args = 'tar', ('xvzf', archive, '-C', dst)
188         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
189             cmd, args = 'tar', ('xvjf', archive, '-C', dst)
190         elif self.match_ext(archive, '.zip'):
191             cmd, args = 'unzip', (archive, '-d', dst)
192         else:
193             raise ValueError('Unknown archive type: {}'.format(archive))
194
195         self.op_attr.set_disk_stack('#'.join(("popen:argv=", cmd) + args))
196
197         return self.get(server)
198
199     @staticmethod
200     def _check_disk_stack_args(*args):
201         for char in ['#', ',', ';', '%23', '%3B']:
202             for arg in args:
203                 if char in arg:
204                     raise ValueError('Unsupported character `{}` in `{}`!'.format(urlunquote(char), urlunquote(arg)))