handling special characters in gridftp urls
[qcg-portal.git] / filex / ftp.py
1 from datetime import datetime
2 from Queue import Queue, Empty
3 from itertools import chain
4 import os
5 import re
6 from threading import Event
7
8 from django.utils.timezone import localtime, UTC
9 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
10
11
12 class FTPException(Exception):
13     pass
14
15
16 class FTPOperation:
17     def __init__(self, proxy=None, buffer_size=4096):
18         self._end = Event()
19         self._error = None
20         self._buffer = Buffer(buffer_size)
21
22         self.attr = HandleAttr()
23         self.op_attr = OperationAttr()
24
25         if proxy is not None:
26             self.op_attr.set_authorization(proxy)
27
28         self.cli = FTPClient(self.attr)
29
30         # limit size of a queue to 4 MB
31         self.stream = Queue((4 * 2**20) / buffer_size or 1)
32
33     def __del__(self):
34         self.attr.destroy()
35         self.op_attr.destroy()
36         self.cli.destroy()
37
38     def _read(self, arg, handle, error, buff, length, offset, eof):
39         if not error:
40             self.stream.put(str(buff))
41
42             if not eof:
43                 self.cli.register_read(self._buffer, self._read, None)
44
45     def _write(self, arg, handle, error, buff, length, offset, eof):
46         if eof or error:
47             return
48
49         chunk = self.stream.get()
50
51         if chunk is None:
52             size = 0
53             eof = True
54         else:
55             size = len(chunk)
56             self._buffer.fill(chunk)
57
58         self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
59
60     def _done(self, arg, handle, error):
61         self._error = error
62         self._end.set()
63
64     def wait(self):
65         self._end.wait()
66         self._end.clear()
67
68         if self._error is not None:
69             # TODO logging
70             print 'GridFTP ERROR:', self._error
71
72             match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
73
74             msg = match.groups()[0] if match else "Unknown error"
75
76             raise FTPException(msg)
77
78     def listing(self, url):
79         self.cli.verbose_list(url, self._done, None, self.op_attr)
80         self.cli.register_read(self._buffer, self._read, None)
81
82         self.wait()
83
84         result = ''
85         while not self.stream.empty():
86             result += self.stream.get()
87
88         return self._parse_mlst(result)
89
90     @staticmethod
91     def _parse_mlst(listing):
92         for item in listing.strip().splitlines():
93             # we may receive empty string when there are multiple consecutive newlines in listing
94             if item:
95                 attrs, name = item.split(' ', 1)
96
97                 attrs_dict = {}
98                 for attr in attrs.split(';'):
99                     try:
100                         key, value = attr.split('=', 1)
101                         attrs_dict[key] = value
102                     except ValueError:
103                         pass
104
105                 yield {
106                     'name': name,
107                     'type': 'directory' if attrs_dict['Type'] == 'dir' else 'file',
108                     'size': int(attrs_dict['Size']),
109                     'date': localtime(datetime.strptime(attrs_dict['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC())),
110                 }
111
112     def get(self, url):
113         self.cli.get(url, self._done, None, self.op_attr)
114         self.cli.register_read(self._buffer, self._read, None)
115
116         while True:
117             try:
118                 yield self.stream.get(timeout=0.1)
119             except Empty:
120                 if self._end.wait(0):
121                     break
122
123         self.wait()
124
125     def put(self, url):
126         self.cli.put(url, self._done, None, self.op_attr)
127         self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
128
129     def move(self, src, dst):
130         self.cli.move(src, dst, self._done, None, self.op_attr)
131
132         self.wait()
133
134     def info(self, url):
135         data = self.listing(url).next()
136
137         if data['name'] == '.':
138             data['name'] = os.path.basename(os.path.normpath(url))
139
140         return data
141
142     def delete(self, url):
143         self.cli.delete(url, self._done, None, self.op_attr)
144
145         self.wait()
146
147     def rmdir(self, url):
148         self.cli.rmdir(url, self._done, None, self.op_attr)
149
150         self.wait()
151
152     def mkdir(self, url):
153         self.cli.mkdir(url, self._done, None, self.op_attr)
154
155         self.wait()
156
157     @staticmethod
158     def match_ext(archive, *extensions):
159         for ext in extensions:
160             if archive.endswith(ext):
161                 return True
162         return False
163
164     def compress(self, server, path, files, archive):
165         if self.match_ext(archive, '.tar.gz', '.tgz'):
166             cmd, args = 'tar', ['cvzf', archive, '-C', path] + files
167         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
168             cmd, args = 'tar', ['cvjf', archive, '-C', path] + files
169         elif self.match_ext(archive, '.zip'):
170             cmd, args = 'jar', (['cvMf', archive] + list(chain.from_iterable(('-C', path, f) for f in files)))
171         else:
172             raise ValueError('Unknown archive type: {}'.format(archive))
173
174         self.op_attr.set_disk_stack('#'.join(["popen:argv=", cmd] + args))
175
176         return self.get(server)
177
178     def extract(self, server, archive, dst):
179         if self.match_ext(archive, '.tar.gz', '.tgz'):
180             cmd, args = 'tar', ('xvzf', archive, '-C', dst)
181         elif self.match_ext(archive, '.tar.bz2', '.tbz'):
182             cmd, args = 'tar', ('xvjf', archive, '-C', dst)
183         elif self.match_ext(archive, '.zip'):
184             cmd, args = 'unzip', (archive, '-d', dst)
185         else:
186             raise ValueError('Unknown archive type: {}'.format(archive))
187
188         self.op_attr.set_disk_stack('#'.join(("popen:argv=", cmd) + args))
189
190         return self.get(server)