More clean-ups and cool template stuff
[invirt/third/libt4.git] / config.cc
1 #include <thread>
2 #include <sstream>
3 #include "config.h"
4 #include "paxos.h"
5 #include "handle.h"
6 #include "threaded_log.h"
7 #include "lang/verify.h"
8
9 // The config module maintains views. As a node joins or leaves a
10 // view, the next view will be the same as previous view, except with
11 // the new node added or removed. The first view contains only node
12 // 1. If node 2 joins after the first node (it will download the views
13 // from node 1), it will learn about view 1 with the first node as the
14 // only member.  It will then invoke Paxos to create the next view.
15 // It will tell Paxos to ask the nodes in view 1 to agree on the value
16 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
17 // {1,2} as the members. When node 3 joins, the config module runs
18 // Paxos with the nodes in view 2 and the proposed value to be
19 // {1,2,3}. And so on.  When a node discovers that some node of the
20 // current view is not responding, it kicks off Paxos to propose a new
21 // value (the current view minus the node that isn't responding). The
22 // config module uses Paxos to create a total order of views, and it
23 // is ensured that the majority of the previous view agrees to the
24 // next view.  The Paxos log contains all the values (i.e., views)
25 // agreed on.
26 //
27 // The RSM module informs config to add nodes. The config module
28 // runs a heartbeater thread that checks in with nodes.  If a node
29 // doesn't respond, the config module will invoke Paxos's proposer to
30 // remove the node.  Higher layers will learn about this change when a
31 // Paxos acceptor accepts the new proposed value through
32 // paxos_commit().
33 //
34 // To be able to bring other nodes up to date to the latest formed
35 // view, each node will have a complete history of all view numbers
36 // and their values that it knows about. At any time a node can reboot
37 // and when it re-joins, it may be many views behind; by remembering
38 // all views, the other nodes can bring this re-joined node up to
39 // date.
40
41 config::config(const string &_first, const string &_me, config_view_change *_vc)
42     : my_view_id(0), first(_first), me(_me), vc(_vc),
43       paxos_acceptor(this, me == _first, me, me),
44       paxos_proposer(this, &paxos_acceptor, me)
45 {
46     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
47     lock cfg_mutex_lock(cfg_mutex);
48     reconstruct(cfg_mutex_lock);
49     thread(&config::heartbeater, this).detach();
50 }
51
52 void config::restore(const string &s) {
53     lock cfg_mutex_lock(cfg_mutex);
54     paxos_acceptor.restore(s);
55     reconstruct(cfg_mutex_lock);
56 }
57
58 void config::get_view(unsigned instance, vector<string> &m) {
59     lock cfg_mutex_lock(cfg_mutex);
60     get_view(instance, m, cfg_mutex_lock);
61 }
62
63 void config::get_view(unsigned instance, vector<string> &m, lock &) {
64     string value = paxos_acceptor.value(instance);
65     LOG("get_view(" << instance << "): returns " << value);
66     m = members(value);
67 }
68
69 vector<string> config::members(const string &value) const {
70     istringstream ist(value);
71     using it = istream_iterator<string>;
72     return {it(ist), it()};
73 }
74
75 string config::value(const vector<string> &m) const {
76     ostringstream ost;
77     copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
78     return ost.str();
79 }
80
81 void config::reconstruct(lock &cfg_mutex_lock) {
82     VERIFY(cfg_mutex_lock);
83     if (paxos_acceptor.instance() > 0) {
84         my_view_id = paxos_acceptor.instance();
85         get_view(my_view_id, mems, cfg_mutex_lock);
86         LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
87     }
88 }
89
90 // Called by Paxos's acceptor.
91 void config::paxos_commit(unsigned instance, const string &value) {
92     lock cfg_mutex_lock(cfg_mutex);
93
94     vector<string> newmem = members(value);
95     LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
96
97     for (auto mem : mems) {
98         LOG("config::paxos_commit: is " << mem << " still a member?");
99         if (!isamember(mem, newmem) && me != mem) {
100             LOG("config::paxos_commit: delete " << mem);
101             invalidate_handle(mem);
102         }
103     }
104
105     mems = newmem;
106     my_view_id = instance;
107     if (vc) {
108         cfg_mutex_lock.unlock();
109         vc->commit_change(instance);
110         cfg_mutex_lock.lock();
111     }
112 }
113
114 bool config::ismember(const string &m, unsigned vid) {
115     lock cfg_mutex_lock(cfg_mutex);
116     vector<string> v;
117     get_view(vid, v, cfg_mutex_lock);
118     return isamember(m, v);
119 }
120
121 bool config::add(const string &new_m, unsigned vid) {
122     lock cfg_mutex_lock(cfg_mutex);
123     if (vid != my_view_id)
124         return false;
125     LOG("config::add " << new_m);
126     vector<string> m = mems;
127     m.push_back(new_m);
128     vector<string> cmems = mems;
129     unsigned nextvid = my_view_id + 1;
130     cfg_mutex_lock.unlock();
131     bool r = paxos_proposer.run(nextvid, cmems, value(m));
132     cfg_mutex_lock.lock();
133     LOG("config::add: proposer returned " << (r ? "success" : "failure"));
134     return r;
135 }
136
137 // caller should hold cfg_mutex
138 bool config::remove(const string &m, lock &cfg_mutex_lock) {
139     LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
140     vector<string> n;
141     for (auto mem : mems) {
142         if (mem != m)
143             n.push_back(mem);
144     }
145     vector<string> cmems = mems;
146     unsigned nextvid = my_view_id + 1;
147     cfg_mutex_lock.unlock();
148     bool r = paxos_proposer.run(nextvid, cmems, value(n));
149     cfg_mutex_lock.lock();
150     LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
151     return r;
152 }
153
154 void config::heartbeater() [[noreturn]] {
155     lock cfg_mutex_lock(cfg_mutex);
156
157     while (1) {
158         auto next_timeout = steady_clock::now() + seconds(3);
159         LOG("heartbeater: go to sleep");
160         config_cond.wait_until(cfg_mutex_lock, next_timeout);
161
162         unsigned vid = my_view_id;
163         vector<string> cmems;
164         get_view(vid, cmems, cfg_mutex_lock);
165         LOG("heartbeater: current membership " << print_members(cmems));
166
167         if (!isamember(me, cmems)) {
168             LOG("heartbeater: not member yet; skip hearbeat");
169             continue;
170         }
171
172         // who has the smallest ID?
173         string m = min(me, *min_element(cmems.begin(), cmems.end()));
174
175         if (m == me) {
176             // ping the other nodes
177             for (string mem : cmems) {
178                 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
179                     continue;
180                 if (vid == my_view_id)
181                     remove(mem, cfg_mutex_lock);
182                 break;
183             }
184         } else {
185             // ping the node with the smallest ID
186             if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
187                 remove(m, cfg_mutex_lock);
188         }
189     }
190 }
191
192 paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
193     lock cfg_mutex_lock(cfg_mutex);
194     r = (int) my_view_id;
195     LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
196     if (vid == my_view_id)
197         return paxos_protocol::OK;
198     else if (paxos_proposer.isrunning()) {
199         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
200         return paxos_protocol::OK;
201     }
202     return paxos_protocol::ERR;
203 }
204
205 config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
206     unsigned vid = my_view_id;
207     LOG("doheartbeater to " << m << " (" << vid << ")");
208     handle h(m);
209
210     cfg_mutex_lock.unlock();
211     int r = 0, ret = rpc_const::bind_failure;
212     if (rpcc *cl = h.safebind())
213         ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
214     cfg_mutex_lock.lock();
215
216     heartbeat_t res = OK;
217     switch (ret) {
218         case paxos_protocol::OK:
219             break;
220         case rpc_const::atmostonce_failure:
221         case rpc_const::oldsrv_failure:
222             invalidate_handle(m);
223             break;
224         default:
225             LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
226             res = (ret < 0) ? FAILURE : VIEWERR;
227     }
228     LOG("doheartbeat done " << res);
229     return res;
230 }