invirt.common: handle yaml's None with empty struct for good error message
[invirt/packages/invirt-base.git] / python / invirt / common.py
1 from __future__ import with_statement
2
3 import unittest
4 from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
5 import contextlib as clib
6
7 class InvirtConfigError(AttributeError):
8     pass
9
10 class struct(object):
11     'A simple namespace object.'
12     def __init__(self, d = {}, __prefix = None, **kwargs):
13         'd is the dictionary or the items-iterable to update my __dict__ with.'
14         self.__dict__.update(d)
15         self.__dict__.update(kwargs)
16         self.__prefix = __prefix
17     def __getattr__(self, key):
18         # XX ideally these would point a frame higher on the stack.
19         prefix = self.__prefix
20         if prefix is not None:
21             raise InvirtConfigError('missing configuration variable %s%s'
22                                     % (prefix, key))
23         else:
24             raise AttributeError("anonymous struct has no member '%s'"
25                                  % (key,))
26
27 def dicts2struct(x, prefix = None):
28     """
29     Given a tree of lists/dicts, perform a deep traversal to transform all the
30     dicts to structs.
31     """
32     if prefix is not None:
33         def newprefix(k): return prefix + str(k) + '.'
34     else:
35         def newprefix(k): return prefix
36     if type(x) == dict:
37         return struct(((k, dicts2struct(v, newprefix(k)))
38                        for k,v in x.iteritems()),
39                       prefix)
40     elif type(x) == list:
41         return [dicts2struct(v, newprefix(i)) for i, v in enumerate(x)]
42     elif x is None:
43         return struct({}, prefix)
44     else:
45         return x
46
47 @clib.contextmanager
48 def lock_file(path, exclusive = True):
49     with clib.closing(file(path, 'w')) as f:
50         if exclusive:
51             locktype = LOCK_EX
52         else:
53             locktype = LOCK_SH
54         flock(f, locktype)
55         try:
56             yield
57         finally:
58             flock(f, LOCK_UN)
59
60 #
61 # Exceptions.
62 #
63
64 class InvalidInput(Exception):
65     """Exception for user-provided input is invalid but maybe in good faith.
66
67     This would include setting memory to negative (which might be a
68     typo) but not setting an invalid boot CD (which requires bypassing
69     the select box).
70     """
71     def __init__(self, err_field, err_value, expl=None):
72         Exception.__init__(self, expl)
73         self.err_field = err_field
74         self.err_value = err_value
75
76 class CodeError(Exception):
77     """Exception for internal errors or bad faith input."""
78     pass
79
80 #
81 # Tests.
82 #
83
84 class common_tests(unittest.TestCase):
85     def test_dicts2structs(self):
86         dicts = {
87                 'atom': 0,
88                 'dict': { 'atom': 'atom', 'list': [1,2,3] },
89                 'list': [ 'atom', {'key': 'value'} ]
90                 }
91         structs = dicts2struct(dicts, '')
92         self.assertEqual(structs.atom,        dicts['atom'])
93         self.assertEqual(structs.dict.atom,   dicts['dict']['atom'])
94         self.assertEqual(structs.dict.list,   dicts['dict']['list'])
95         self.assertEqual(structs.list[0],     dicts['list'][0])
96         self.assertEqual(structs.list[1].key, dicts['list'][1]['key'])
97
98 if __name__ == '__main__':
99     unittest.main()