action buttons
[qcg-portal.git] / filex / ftp.py
1 from Queue import Queue, Empty
2 import re
3 from threading import Event
4
5 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
6
7
8 class FTPException(Exception):
9     pass
10
11
12 class FTPOperation:
13     def __init__(self, proxy=None):
14         self.end = Event()
15         self.attr = HandleAttr()
16         self.op_attr = OperationAttr()
17
18         if proxy is not None:
19             self.op_attr.set_authorization(proxy)
20
21         self.buffer = Buffer(4096)
22         self.cli = FTPClient(self.attr)
23
24         self.stream = Queue()
25         self.error = None
26
27     def __del__(self):
28         self.attr.destroy()
29         self.op_attr.destroy()
30         self.cli.destroy()
31
32     def report_error(self):
33         if self.error is not None:
34             match = re.search(r'A system call failed: (.*)$', self.error.replace('\r\n', '\n'), re.MULTILINE)
35
36             msg = match.groups()[0] if match else "Unknown error"
37
38             raise FTPException(msg)
39
40     def _read(self, arg, handle, error, buff, length, offset, eof):
41         if not error:
42             self.stream.put(str(buff))
43
44             if not eof:
45                 self.cli.register_read(self.buffer, self._read, None)
46
47     def _done(self, arg, handle, error):
48         self.error = error
49         self.end.set()
50
51     def listing(self, url):
52         self.cli.verbose_list(url, self._done, None, self.op_attr)
53         self.cli.register_read(self.buffer, self._read, None)
54
55         self.end.wait()
56         self.end.clear()
57
58         self.report_error()
59
60         result = ''
61         while not self.stream.empty():
62             result += self.stream.get()
63
64         return result
65
66     def get(self, url):
67         self.cli.get(url, self._done, None, self.op_attr)
68         self.cli.register_read(self.buffer, self._read, None)
69
70         while True:
71             try:
72                 yield self.stream.get(timeout=0.1)
73             except Empty:
74                 if self.end.wait(0):
75                     break
76
77         self.end.clear()
78
79         self.report_error()