2b09d3e65f841a1a831cdba8c9945c227bc82e21
[qcg-portal.git] / filex / ftp.py
1 from datetime import datetime
2 from Queue import Queue, Empty
3 from itertools import chain
4 import os
5 import re
6 from threading import Event
7
8 from django.utils.http import urlunquote
9 from django.utils.timezone import localtime, UTC
10 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
11
12
13 class FTPError(Exception):
14     def __init__(self, message, verbose=None, *args, **kwargs):
15         super(FTPError, self).__init__(message, *args, **kwargs)
16
17         self.verbose = verbose
18
19
20 class FTPOperation:
21     def __init__(self, proxy=None, buffer_size=4096):
22         self._end = Event()
23         self._error = None
24         self._buffer = Buffer(buffer_size)
25
26         self.attr = HandleAttr()
27         self.op_attr = OperationAttr()
28
29         if proxy is not None:
30             self.op_attr.set_authorization(proxy)
31
32         self.cli = FTPClient(self.attr)
33
34         # limit size of a queue to 4 MB
35         self.stream = Queue((4 * 2**20) / buffer_size or 1)
36
37     def __del__(self):
38         self.attr.destroy()
39         self.op_attr.destroy()
40         self.cli.destroy()
41
42     def _read(self, arg, handle, error, buff, length, offset, eof):
43         if not error:
44             self.stream.put(str(buff))
45
46             if not eof:
47                 self.cli.register_read(self._buffer, self._read, None)
48
49     def _write(self, arg, handle, error, buff, length, offset, eof):
50         if eof or error:
51             return
52
53         chunk = self.stream.get()
54
55         if chunk is None:
56             size = 0
57             eof = True
58         else:
59             size = len(chunk)
60             self._buffer.fill(chunk)
61
62         self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
63
64     def _done(self, arg, handle, error):
65         self._error = error
66         self._end.set()
67
68     def wait(self):
69         self._end.wait()
70         self._end.clear()
71
72         if self._error is not None:
73             match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
74
75             msg = match.groups()[0] if match else "Unknown error"
76
77             raise FTPError(msg, self._error)
78
79     def listing(self, url):
80         self.cli.verbose_list(url, self._done, None, self.op_attr)
81         self.cli.register_read(self._buffer, self._read, None)
82
83         self.wait()
84
85         result = ''
86         while not self.stream.empty():
87             result += self.stream.get()
88
89         return self._parse_mlst(result)
90
91     @staticmethod
92     def _parse_mlst(listing):
93         for item in listing.strip().splitlines():
94             # we may receive empty string when there are multiple consecutive newlines in listing
95             if item:
96                 attrs, name = item.split(' ', 1)
97
98                 attrs_dict = {}
99                 for attr in attrs.split(';'):
100                     try:
101                         key, value = attr.split('=', 1)
102                     except ValueError:
103                         key, value = attr, ''
104
105                     attrs_dict[key] = value
106
107                 yield {
108                     'name': name,
109                     'type': 'directory' if attrs_dict['Type'].endswith('dir') else 'file',
110                     'size': int(attrs_dict['Size']),
111                     'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
112                 }
113
114     def get(self, url):
115         self.cli.get(url, self._done, None, self.op_attr)
116         self.cli.register_read(self._buffer, self._read, None)
117
118         while True:
119             try:
120                 yield self.stream.get(timeout=0.1)
121             except Empty:
122                 if self._end.wait(0):
123                     break
124
125         self.wait()
126
127     def put(self, url):
128         self.cli.put(url, self._done, None, self.op_attr)
129         self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
130
131     def move(self, src, dst):
132         self.cli.move(src, dst, self._done, None, self.op_attr)
133
134         self.wait()
135
136     def info(self, url):
137         data = self.listing(url).next()
138
139         if data['name'] == '.':
140             data['name'] = os.path.basename(os.path.normpath(url))
141
142         return data
143
144     def delete(self, url):
145         self.cli.delete(url, self._done, None, self.op_attr)
146
147         self.wait()
148
149     def rmdir(self, url):
150         self.cli.rmdir(url, self._done, None, self.op_attr)
151
152         self.wait()
153
154     def mkdir(self, url):
155         self.cli.mkdir(url, self._done, None, self.op_attr)
156
157         self.wait()
158
159     @staticmethod
160     def match_ext(archive, *extensions):
161         for ext in extensions:
162             if archive.endswith(ext):
163                 return True
164         return False
165
166     def compress(self, server, path, files, archive):
167         self._check_disk_stack_args(*([path, archive] + files))
168
169         if self.match_ext(archive, '.tar.gz', '.tgz'):
170             cmd, args = 'tar', ['cvzf', archive, '-C', path] + files
171         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
172             cmd, args = 'tar', ['cvjf', archive, '-C', path] + files
173         elif self.match_ext(archive, '.zip'):
174             cmd, args = 'jar', (['cvMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
175         else:
176             raise ValueError('Unknown archive type: {}'.format(archive))
177
178         self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + args))
179
180         return self.get(server)
181
182     def extract(self, server, archive, dst):
183         self._check_disk_stack_args(*[archive, dst])
184
185         if self.match_ext(archive, '.tar.gz', '.tgz'):
186             cmd, args = 'tar', ('xvzf', archive, '-C', dst)
187         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
188             cmd, args = 'tar', ('xvjf', archive, '-C', dst)
189         elif self.match_ext(archive, '.zip'):
190             cmd, args = 'unzip', (archive, '-d', dst)
191         else:
192             raise ValueError('Unknown archive type: {}'.format(archive))
193
194         self.op_attr.set_disk_stack('#'.join(("popen:argv=", cmd) + args))
195
196         return self.get(server)
197
198     @staticmethod
199     def _check_disk_stack_args(*args):
200         for char in ['#', ',', ';', '%23', '%3B']:
201             for arg in args:
202                 if char in arg:
203                     raise ValueError('Unsupported character `{}` in `{}`!'.format(urlunquote(char), urlunquote(arg)))