f228a332f6b88f6c541a9443928b0a36baa96402
[invirt/packages/invirt-base.git] / python / invirt / common.py
1 from __future__ import with_statement
2
3 import unittest
4 from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
5 import contextlib as clib
6 import subprocess
7
8 class InvirtConfigError(AttributeError):
9     pass
10
11 class struct(object):
12     'A simple namespace object.'
13     def __init__(self, d = {}, __prefix = None, **kwargs):
14         'd is the dictionary or the items-iterable to update my __dict__ with.'
15         self.__dict__.update(d)
16         self.__dict__.update(kwargs)
17         self.__prefix = __prefix
18     def __getattr__(self, key):
19         # XX ideally these would point a frame higher on the stack.
20         prefix = self.__prefix
21         if prefix is not None:
22             raise InvirtConfigError('missing configuration variable %s%s'
23                                     % (prefix, key))
24         else:
25             raise AttributeError("anonymous struct has no member '%s'"
26                                  % (key,))
27
28 def dicts2struct(x, prefix = None):
29     """
30     Given a tree of lists/dicts, perform a deep traversal to transform all the
31     dicts to structs.
32     """
33     if prefix is not None:
34         def newprefix(k): return prefix + str(k) + '.'
35     else:
36         def newprefix(k): return prefix
37     if type(x) == dict:
38         return struct(((k, dicts2struct(v, newprefix(k)))
39                        for k,v in x.iteritems()),
40                       prefix)
41     elif type(x) == list:
42         return [dicts2struct(v, newprefix(i)) for i, v in enumerate(x)]
43     elif x is None:
44         return struct({}, prefix)
45     else:
46         return x
47
48 @clib.contextmanager
49 def lock_file(path, exclusive = True):
50     with clib.closing(file(path, 'w')) as f:
51         if exclusive:
52             locktype = LOCK_EX
53         else:
54             locktype = LOCK_SH
55         flock(f, locktype)
56         try:
57             yield
58         finally:
59             flock(f, LOCK_UN)
60
61 def captureOutput(popen_args, stdin_str=None, *args, **kwargs):
62     """Capture stdout from a command.
63
64     This method will proxy the arguments to subprocess.Popen. It
65     returns the output from the command if the call succeeded and
66     raises an exception if the process returns a non-0 value.
67
68     This is intended to be a variant on the subprocess.check_call
69     function that also allows you access to the output from the
70     command.
71     """
72     if 'stdin' not in kwargs:
73         kwargs['stdin'] = subprocess.PIPE
74     if 'stdout' not in kwargs:
75         kwargs['stdout'] = subprocess.PIPE
76     if 'stderr' not in kwargs:
77         kwargs['stderr'] = subprocess.STDOUT
78     p = subprocess.Popen(popen_args, *args, **kwargs)
79     out, _ = p.communicate(stdin_str)
80     if p.returncode:
81         raise subprocess.CalledProcessError(p.returncode, popen_args, out)
82     return out
83
84 #
85 # Exceptions.
86 #
87
88 class InvalidInput(Exception):
89     """Exception for user-provided input is invalid but maybe in good faith.
90
91     This would include setting memory to negative (which might be a
92     typo) but not setting an invalid boot CD (which requires bypassing
93     the select box).
94     """
95     def __init__(self, err_field, err_value, expl=None):
96         Exception.__init__(self, expl)
97         self.err_field = err_field
98         self.err_value = err_value
99
100 class CodeError(Exception):
101     """Exception for internal errors or bad faith input."""
102     pass
103
104 #
105 # Tests.
106 #
107
108 class common_tests(unittest.TestCase):
109     def test_dicts2structs(self):
110         dicts = {
111                 'atom': 0,
112                 'dict': { 'atom': 'atom', 'list': [1,2,3] },
113                 'list': [ 'atom', {'key': 'value'} ]
114                 }
115         structs = dicts2struct(dicts, '')
116         self.assertEqual(structs.atom,        dicts['atom'])
117         self.assertEqual(structs.dict.atom,   dicts['dict']['atom'])
118         self.assertEqual(structs.dict.list,   dicts['dict']['list'])
119         self.assertEqual(structs.list[0],     dicts['list'][0])
120         self.assertEqual(structs.list[1].key, dicts['list'][1]['key'])
121
122 if __name__ == '__main__':
123     unittest.main()