add jquery.fileupload and humanize.duration
[qcg-portal.git] / filex / ftp.py
1 from Queue import Queue, Empty
2 import re
3 from threading import Event
4
5 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
6
7
8 class FTPException(Exception):
9     pass
10
11
12 class FTPOperation:
13     def __init__(self, proxy=None, buffer_size=4096):
14         self._end = Event()
15         self._error = None
16         self._buffer = Buffer(buffer_size)
17
18         self.attr = HandleAttr()
19         self.op_attr = OperationAttr()
20
21         if proxy is not None:
22             self.op_attr.set_authorization(proxy)
23
24         self.cli = FTPClient(self.attr)
25
26         # limit size of a queue to 4 MB
27         self.stream = Queue((4 * 2**20) / buffer_size or 1)
28
29     def __del__(self):
30         self.attr.destroy()
31         self.op_attr.destroy()
32         self.cli.destroy()
33
34     def _read(self, arg, handle, error, buff, length, offset, eof):
35         if not error:
36             self.stream.put(str(buff))
37
38             if not eof:
39                 self.cli.register_read(self._buffer, self._read, None)
40
41     def _write(self, arg, handle, error, buff, length, offset, eof):
42         if eof or error:
43             return
44
45         chunk = self.stream.get()
46
47         if chunk is None:
48             size = 0
49             eof = True
50         else:
51             size = len(chunk)
52             self._buffer.fill(chunk)
53
54         self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
55
56     def _done(self, arg, handle, error):
57         self._error = error
58         self._end.set()
59
60     def wait(self):
61         self._end.wait()
62         self._end.clear()
63
64         if self._error is not None:
65             match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
66
67             msg = match.groups()[0] if match else "Unknown error"
68
69             raise FTPException(msg)
70
71     def listing(self, url):
72         self.cli.verbose_list(url, self._done, None, self.op_attr)
73         self.cli.register_read(self._buffer, self._read, None)
74
75         self.wait()
76
77         result = ''
78         while not self.stream.empty():
79             result += self.stream.get()
80
81         return result
82
83     def get(self, url):
84         self.cli.get(url, self._done, None, self.op_attr)
85         self.cli.register_read(self._buffer, self._read, None)
86
87         while True:
88             try:
89                 yield self.stream.get(timeout=0.1)
90             except Empty:
91                 if self._end.wait(0):
92                     break
93
94         self.wait()
95
96     def put(self, url):
97         self.cli.put(url, self._done, None, self.op_attr)
98         self.cli.register_write(self._buffer, 0, 0, False, self._write, None)