So many changes. Broken.
[invirt/third/libt4.git] / config.cc
1 #include "include/config.h"
2
3 using std::vector;
4 using namespace std::chrono;
5
6 // The config module maintains views. As a node joins or leaves a view, the
7 // next view will be the same as previous view, except with the new node added
8 // or removed. The first view contains only node 1. If node 2 joins after the
9 // first node (it will download the views from node 1), it will learn about
10 // view 1 with the first node as the only member.  It will then invoke Paxos to
11 // create the next view.  It will tell Paxos to ask the nodes in view 1 to
12 // agree on the value {1, 2}.  If Paxos returns success, then it moves to view
13 // 2 with {1,2} as the members. When node 3 joins, the config module runs Paxos
14 // with the nodes in view 2 and the proposed value to be {1,2,3}. And so on.
15 // When a node discovers that some node of the current view is not responding,
16 // it kicks off Paxos to propose a new value (the current view minus the node
17 // that isn't responding). The config module uses Paxos to create a total order
18 // of views, and it is ensured that the majority of the previous view agrees to
19 // the next view.  The Paxos log contains all the values (i.e., views) agreed
20 // on.
21 //
22 // The RSM module informs config to add nodes. The config module runs a
23 // heartbeater thread that checks in with nodes.  If a node doesn't respond,
24 // the config module will invoke Paxos's proposer to remove the node.  Higher
25 // layers will learn about this change when a Paxos acceptor accepts the new
26 // proposed value through paxos_commit().
27 //
28 // To be able to bring other nodes up to date to the latest formed view, each
29 // node will have a complete history of all view numbers and their values that
30 // it knows about. At any time a node can reboot and when it re-joins, it may
31 // be many views behind; by remembering all views, the other nodes can bring
32 // this re-joined node up to date.
33
34 config_view_change::~config_view_change() {}
35
36 config::config(const string & _first, const string & _me, config_view_change *_vc)
37     : first(_first), me(_me), vc(_vc),
38       paxos(this, me == _first, me, me)
39 {
40     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
41     lock cfg_mutex_lock(cfg_mutex);
42     reconstruct(cfg_mutex_lock);
43     thread(&config::heartbeater, this).detach();
44 }
45
46 void config::restore(const string & s) {
47     lock cfg_mutex_lock(cfg_mutex);
48     paxos.restore(s);
49     reconstruct(cfg_mutex_lock);
50 }
51
52 vector<string> config::get_view(unsigned instance) {
53     lock cfg_mutex_lock(cfg_mutex);
54     return get_view(instance, cfg_mutex_lock);
55 }
56
57 vector<string> config::get_view(unsigned instance, lock & cfg_mutex_lock) {
58     VERIFY(cfg_mutex_lock);
59     string value = paxos.value(instance);
60     LOG << "get_view(" << instance << "): returns " << value;
61     return explode(value);
62 }
63
64 void config::reconstruct(lock & cfg_mutex_lock) {
65     VERIFY(cfg_mutex_lock);
66     my_view_id = paxos.instance();
67     if (my_view_id > 0) {
68         mems = get_view(my_view_id, cfg_mutex_lock);
69         LOG << "view " << my_view_id << " " << mems;
70     }
71 }
72
73 // Called by Paxos's acceptor.
74 void config::paxos_commit(unsigned instance, const string & value) {
75     lock cfg_mutex_lock(cfg_mutex);
76
77     vector<string> newmem = explode(value);
78     LOG << "instance " << instance << ": " << newmem;
79
80     for (auto mem : mems) {
81         LOG << "is " << mem << " still a member?";
82         if (!isamember(mem, newmem) && me != mem) {
83             LOG << "delete " << mem;
84             rpcc::unbind_cached(mem);
85         }
86     }
87
88     mems = newmem;
89     my_view_id = instance;
90     if (vc) {
91         cfg_mutex_lock.unlock();
92         vc->commit_change(instance);
93         cfg_mutex_lock.lock();
94     }
95 }
96
97 bool config::ismember(const string & m, unsigned vid) {
98     lock cfg_mutex_lock(cfg_mutex);
99     return isamember(m, get_view(vid, cfg_mutex_lock));
100 }
101
102 bool config::add(const string & new_m, unsigned vid) {
103     lock cfg_mutex_lock(cfg_mutex);
104     LOG << "adding " << new_m << " to " << vid;
105     if (vid != my_view_id) {
106         LOG << "that's not my view id, " << my_view_id << "!";
107         return false;
108     }
109     LOG << "calling down to paxos layer";
110     vector<string> m(mems), cmems(mems);
111     m.push_back(new_m);
112     LOG << "old mems " << cmems << " " << implode(cmems);
113     LOG << "new mems " << m << " " << implode(m);
114     unsigned nextvid = my_view_id + 1;
115     cfg_mutex_lock.unlock();
116     bool r = paxos.run(nextvid, cmems, implode(m));
117     cfg_mutex_lock.lock();
118     LOG << "paxos proposer returned " << (r ? "success" : "failure");
119     return r;
120 }
121
122 // caller should hold cfg_mutex
123 bool config::remove(const string & m, lock & cfg_mutex_lock) {
124     VERIFY(cfg_mutex_lock);
125     LOG << "my_view_id " << my_view_id << " remove? " << m;
126     vector<string> n;
127     for (auto mem : mems) {
128         if (mem != m)
129             n.push_back(mem);
130     }
131     vector<string> cmems = mems;
132     unsigned nextvid = my_view_id + 1;
133     cfg_mutex_lock.unlock();
134     bool r = paxos.run(nextvid, cmems, implode(n));
135     cfg_mutex_lock.lock();
136     LOG << "proposer returned " << (r ? "success" : "failure");
137     return r;
138 }
139
140 void config::heartbeater() {
141     lock cfg_mutex_lock(cfg_mutex);
142
143     while (1) {
144         auto next_timeout = steady_clock::now() + 300ms;
145         LOG << "go to sleep";
146         config_cond.wait_until(cfg_mutex_lock, next_timeout);
147
148         unsigned vid = my_view_id;
149         vector<string> cmems = get_view(vid, cfg_mutex_lock);
150         LOG << "current membership " << cmems;
151
152         if (!isamember(me, cmems)) {
153             LOG << "not member yet; skip hearbeat";
154             continue;
155         }
156
157         // who has the smallest ID?
158         string m = std::min(me, *std::min_element(cmems.begin(), cmems.end()));
159
160         if (m == me) {
161             // ping the other nodes
162             for (string mem : cmems) {
163                 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
164                     continue;
165                 if (vid == my_view_id)
166                     remove(mem, cfg_mutex_lock);
167                 break;
168             }
169         } else {
170             // ping the node with the smallest ID
171             if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
172                 remove(m, cfg_mutex_lock);
173         }
174     }
175 }
176
177 paxos_protocol::status config::heartbeat(paxos_protocol::view_t & r, string m, paxos_protocol::view_t vid) {
178     lock cfg_mutex_lock(cfg_mutex);
179     r = my_view_id;
180     LOG << "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
181     if (vid == my_view_id)
182         return paxos_protocol::OK;
183     else if (paxos.isrunning()) {
184         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
185         return paxos_protocol::OK;
186     }
187     return paxos_protocol::ERR;
188 }
189
190 config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
191     VERIFY(cfg_mutex_lock);
192     paxos_protocol::view_t vid = my_view_id, r;
193     LOG << "heartbeat to " << m << " (" << vid << ")";
194
195     cfg_mutex_lock.unlock();
196     int ret = rpc_protocol::bind_failure;
197     if (auto cl = rpcc::bind_cached(m))
198         ret = cl->call_timeout(paxos_protocol::heartbeat, 100ms, r, me, vid);
199     cfg_mutex_lock.lock();
200
201     heartbeat_t res = OK;
202     switch (ret) {
203         case paxos_protocol::OK:
204             break;
205         case rpc_protocol::atmostonce_failure:
206         case rpc_protocol::oldsrv_failure:
207             rpcc::unbind_cached(m);
208             break;
209         default:
210             LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
211             res = (ret < 0) ? FAILURE : VIEWERR;
212     }
213     LOG << "done " << res;
214     return res;
215 }