Update shebang to Python 3
[invirt/packages/invirt-base.git] / python / invirt / common.py
1 import unittest
2 from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
3 import contextlib as clib
4 import subprocess
5
6
7 class InvirtConfigError(AttributeError):
8     pass
9
10 class struct(dict):
11     'A simple namespace object.'
12     def __init__(self, d = {}, __prefix = None, __default=None, **kwargs):
13         super(struct, self).__init__(d)
14         self.__prefix = __prefix
15         self.__default = __default
16         self.update(kwargs)
17     def __getattr__(self, key):
18         try:
19             return self[key]
20         except KeyError:
21             if self.__default is None:
22                 # XX ideally these would point a frame higher on the stack.
23                 prefix = self.__prefix
24                 if prefix is not None:
25                     raise InvirtConfigError('missing configuration variable '
26                                             '%s%s' % (prefix, key))
27                 else:
28                     raise AttributeError("anonymous struct has no member '%s'"
29                                          % (key,))
30             else:
31                 return struct({}, '', self.__default)
32
33 def dicts2struct(x, prefix = None, default = None):
34     """
35     Given a tree of lists/dicts, perform a deep traversal to transform all the
36     dicts to structs.
37     """
38     if prefix is not None:
39         def newprefix(k): return prefix + str(k) + '.'
40     else:
41         def newprefix(k): return prefix
42     if type(x) == dict:
43         return struct(((k, dicts2struct(v, newprefix(k), default))
44                        for k,v in x.iteritems()),
45                       prefix,
46                       default)
47     elif type(x) == list:
48         return [dicts2struct(v, newprefix(i), default)
49                 for i, v in enumerate(x)]
50     elif x is None:
51         return struct({}, prefix, default)
52     else:
53         return x
54
55 @clib.contextmanager
56 def lock_file(path, exclusive = True):
57     with clib.closing(file(path, 'w')) as f:
58         if exclusive:
59             locktype = LOCK_EX
60         else:
61             locktype = LOCK_SH
62         flock(f, locktype)
63         try:
64             yield
65         finally:
66             flock(f, LOCK_UN)
67
68 def captureOutput(popen_args, stdin_str=None, *args, **kwargs):
69     """Capture stdout from a command.
70
71     This method will proxy the arguments to subprocess.Popen. It
72     returns the output from the command if the call succeeded and
73     raises an exception if the process returns a non-0 value.
74
75     This is intended to be a variant on the subprocess.check_call
76     function that also allows you access to the output from the
77     command.
78     """
79     if 'stdin' not in kwargs:
80         kwargs['stdin'] = subprocess.PIPE
81     if 'stdout' not in kwargs:
82         kwargs['stdout'] = subprocess.PIPE
83     if 'stderr' not in kwargs:
84         kwargs['stderr'] = subprocess.PIPE
85     p = subprocess.Popen(popen_args, *args, **kwargs)
86     out, err = p.communicate(stdin_str)
87     if p.returncode:
88         raise subprocess.CalledProcessError(p.returncode, '%s, stdout: %s, stderr: %s' %
89                                             (popen_args, out, err))
90     return out
91
92 #
93 # Exceptions.
94 #
95
96 class InvalidInput(Exception):
97     """Exception for user-provided input is invalid but maybe in good faith.
98
99     This would include setting memory to negative (which might be a
100     typo) but not setting an invalid boot CD (which requires bypassing
101     the select box).
102     """
103     def __init__(self, err_field, err_value, expl=None):
104         Exception.__init__(self, expl)
105         self.err_field = err_field
106         self.err_value = err_value
107
108 class CodeError(Exception):
109     """Exception for internal errors or bad faith input."""
110     pass
111
112 #
113 # Tests.
114 #
115
116 class common_tests(unittest.TestCase):
117     def test_dicts2structs(self):
118         dicts = {
119                 'atom': 0,
120                 'dict': { 'atom': 'atom', 'list': [1,2,3] },
121                 'list': [ 'atom', {'key': 'value'} ]
122                 }
123         structs = dicts2struct(dicts, '')
124         self.assertEqual(structs.atom,        dicts['atom'])
125         self.assertEqual(structs.dict.atom,   dicts['dict']['atom'])
126         self.assertEqual(structs.dict.list,   dicts['dict']['list'])
127         self.assertEqual(structs.list[0],     dicts['list'][0])
128         self.assertEqual(structs.list[1].key, dicts['list'][1]['key'])
129         self.assertEqual(set(structs), set(['atom', 'dict', 'list']))
130
131 if __name__ == '__main__':
132     unittest.main()