c0b3e25d4d54419800d2433ef4f3a028d6cc0f53
[qcg-portal.git] / filex / ftp.py
1 from datetime import datetime
2 from Queue import Queue, Empty
3 import re
4 from threading import Event
5
6 from django.utils.timezone import localtime, UTC
7 from gridftp import FTPClient, Buffer, HandleAttr, OperationAttr
8
9
10 class FTPException(Exception):
11     pass
12
13
14 class FTPOperation:
15     def __init__(self, proxy=None, buffer_size=4096):
16         self._end = Event()
17         self._error = None
18         self._buffer = Buffer(buffer_size)
19
20         self.attr = HandleAttr()
21         self.op_attr = OperationAttr()
22
23         if proxy is not None:
24             self.op_attr.set_authorization(proxy)
25
26         self.cli = FTPClient(self.attr)
27
28         # limit size of a queue to 4 MB
29         self.stream = Queue((4 * 2**20) / buffer_size or 1)
30
31     def __del__(self):
32         self.attr.destroy()
33         self.op_attr.destroy()
34         self.cli.destroy()
35
36     def _read(self, arg, handle, error, buff, length, offset, eof):
37         if not error:
38             self.stream.put(str(buff))
39
40             if not eof:
41                 self.cli.register_read(self._buffer, self._read, None)
42
43     def _write(self, arg, handle, error, buff, length, offset, eof):
44         if eof or error:
45             return
46
47         chunk = self.stream.get()
48
49         if chunk is None:
50             size = 0
51             eof = True
52         else:
53             size = len(chunk)
54             self._buffer.fill(chunk)
55
56         self.cli.register_write(self._buffer, size, offset, eof, self._write, None)
57
58     def _done(self, arg, handle, error):
59         self._error = error
60         self._end.set()
61
62     def wait(self):
63         self._end.wait()
64         self._end.clear()
65
66         if self._error is not None:
67             match = re.search(r'A system call failed: (.*)$', self._error.replace('\r\n', '\n'), re.MULTILINE)
68
69             msg = match.groups()[0] if match else "Unknown error"
70
71             raise FTPException(msg)
72
73     def listing(self, url):
74         self.cli.verbose_list(url, self._done, None, self.op_attr)
75         self.cli.register_read(self._buffer, self._read, None)
76
77         self.wait()
78
79         result = ''
80         while not self.stream.empty():
81             result += self.stream.get()
82
83         return self._parse_mlst(result)
84
85     @staticmethod
86     def _parse_mlst(listing):
87         data = []
88
89         for item in listing.strip().splitlines():
90             # we may receive empty string when there are multiple consecutive newlines in listing
91             if item:
92                 attrs, name = item.split(' ', 1)
93
94                 attrs = dict((attr.split('=') for attr in attrs.split(';') if attr))
95
96                 date = localtime(datetime.strptime(attrs['Modify'], "%Y%m%d%H%M%S").replace(tzinfo=UTC()))
97
98                 data.append({
99                     'name': name,
100                     'type': 'file' if attrs['Type'] == 'file' else 'directory',
101                     'size': int(attrs['Size']),
102                     'date': date,
103                 })
104
105         return data
106
107     def get(self, url):
108         self.cli.get(url, self._done, None, self.op_attr)
109         self.cli.register_read(self._buffer, self._read, None)
110
111         while True:
112             try:
113                 yield self.stream.get(timeout=0.1)
114             except Empty:
115                 if self._end.wait(0):
116                     break
117
118         self.wait()
119
120     def put(self, url):
121         self.cli.put(url, self._done, None, self.op_attr)
122         self.cli.register_write(self._buffer, 0, 0, False, self._write, None)
123
124     def move(self, src, dst):
125         self.cli.move(src, dst, self._done, None, self.op_attr)
126
127         self.wait()