e4f7c25f61171044419a247723f92cb2626342c6
[invirt/packages/invirt-base.git] / python / invirt / common.py
1 from __future__ import with_statement
2
3 import unittest
4 from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
5 import contextlib as clib
6 import subprocess
7
8 class InvirtConfigError(AttributeError):
9     pass
10
11 class struct(dict):
12     'A simple namespace object.'
13     def __init__(self, d = {}, __prefix = None, **kwargs):
14         super(struct, self).__init__(d)
15         self.__prefix = __prefix
16         self.update(kwargs)
17     def __getattr__(self, key):
18         try:
19             return self[key]
20         except KeyError:
21             # XX ideally these would point a frame higher on the stack.
22             prefix = self.__prefix
23             if prefix is not None:
24                 raise InvirtConfigError('missing configuration variable %s%s'
25                                         % (prefix, key))
26             else:
27                 raise AttributeError("anonymous struct has no member '%s'"
28                                      % (key,))
29
30 def dicts2struct(x, prefix = None):
31     """
32     Given a tree of lists/dicts, perform a deep traversal to transform all the
33     dicts to structs.
34     """
35     if prefix is not None:
36         def newprefix(k): return prefix + str(k) + '.'
37     else:
38         def newprefix(k): return prefix
39     if type(x) == dict:
40         return struct(((k, dicts2struct(v, newprefix(k)))
41                        for k,v in x.iteritems()),
42                       prefix)
43     elif type(x) == list:
44         return [dicts2struct(v, newprefix(i)) for i, v in enumerate(x)]
45     elif x is None:
46         return struct({}, prefix)
47     else:
48         return x
49
50 @clib.contextmanager
51 def lock_file(path, exclusive = True):
52     with clib.closing(file(path, 'w')) as f:
53         if exclusive:
54             locktype = LOCK_EX
55         else:
56             locktype = LOCK_SH
57         flock(f, locktype)
58         try:
59             yield
60         finally:
61             flock(f, LOCK_UN)
62
63 def captureOutput(popen_args, stdin_str=None, *args, **kwargs):
64     """Capture stdout from a command.
65
66     This method will proxy the arguments to subprocess.Popen. It
67     returns the output from the command if the call succeeded and
68     raises an exception if the process returns a non-0 value.
69
70     This is intended to be a variant on the subprocess.check_call
71     function that also allows you access to the output from the
72     command.
73     """
74     if 'stdin' not in kwargs:
75         kwargs['stdin'] = subprocess.PIPE
76     if 'stdout' not in kwargs:
77         kwargs['stdout'] = subprocess.PIPE
78     if 'stderr' not in kwargs:
79         kwargs['stderr'] = subprocess.STDOUT
80     p = subprocess.Popen(popen_args, *args, **kwargs)
81     out, _ = p.communicate(stdin_str)
82     if p.returncode:
83         raise subprocess.CalledProcessError(p.returncode, popen_args, out)
84     return out
85
86 #
87 # Exceptions.
88 #
89
90 class InvalidInput(Exception):
91     """Exception for user-provided input is invalid but maybe in good faith.
92
93     This would include setting memory to negative (which might be a
94     typo) but not setting an invalid boot CD (which requires bypassing
95     the select box).
96     """
97     def __init__(self, err_field, err_value, expl=None):
98         Exception.__init__(self, expl)
99         self.err_field = err_field
100         self.err_value = err_value
101
102 class CodeError(Exception):
103     """Exception for internal errors or bad faith input."""
104     pass
105
106 #
107 # Tests.
108 #
109
110 class common_tests(unittest.TestCase):
111     def test_dicts2structs(self):
112         dicts = {
113                 'atom': 0,
114                 'dict': { 'atom': 'atom', 'list': [1,2,3] },
115                 'list': [ 'atom', {'key': 'value'} ]
116                 }
117         structs = dicts2struct(dicts, '')
118         self.assertEqual(structs.atom,        dicts['atom'])
119         self.assertEqual(structs.dict.atom,   dicts['dict']['atom'])
120         self.assertEqual(structs.dict.list,   dicts['dict']['list'])
121         self.assertEqual(structs.list[0],     dicts['list'][0])
122         self.assertEqual(structs.list[1].key, dicts['list'][1]['key'])
123         self.assertEqual(set(structs), set(['atom', 'dict', 'list']))
124
125 if __name__ == '__main__':
126     unittest.main()