c76614599dd58b7c23b22acd7a6265cbf41d3791
[invirt/third/libt4.git] / rsm.cc
1 //
2 // Replicated state machine implementation with a primary and several
3 // backups. The primary receives requests, assigns each a view stamp (a
4 // vid, and a sequence number) in the order of reception, and forwards
5 // them to all backups. A backup executes requests in the order that
6 // the primary stamps them and replies with an OK to the primary. The
7 // primary executes the request after it receives OKs from all backups,
8 // and sends the reply back to the client.
9 //
10 // The config module will tell the RSM about a new view. If the
11 // primary in the previous view is a member of the new view, then it
12 // will stay the primary.  Otherwise, the smallest numbered node of
13 // the previous view will be the new primary.  In either case, the new
14 // primary will be a node from the previous view.  The configuration
15 // module constructs the sequence of views for the RSM and the RSM
16 // ensures there will be always one primary, who was a member of the
17 // last view.
18 //
19 // When a new node starts, the recovery thread is in charge of joining
20 // the RSM.  It will collect the internal RSM state from the primary;
21 // the primary asks the config module to add the new node and returns
22 // to the joining the internal RSM state (e.g., paxos log). Since
23 // there is only one primary, all joins happen in well-defined total
24 // order.
25 //
26 // The recovery thread also runs during a view change (e.g, when a node
27 // has failed).  After a failure some of the backups could have
28 // processed a request that the primary has not, but those results are
29 // not visible to clients (since the primary responds).  If the
30 // primary of the previous view is in the current view, then it will
31 // be the primary and its state is authoritive: the backups download
32 // from the primary the current state.  A primary waits until all
33 // backups have downloaded the state.  Once the RSM is in sync, the
34 // primary accepts requests again from clients.  If one of the backups
35 // is the new primary, then its state is authoritative.  In either
36 // scenario, the next view uses a node as primary that has the state
37 // resulting from processing all acknowledged client requests.
38 // Therefore, if the nodes sync up before processing the next request,
39 // the next view will have the correct state.
40 //
41 // While the RSM in a view change (i.e., a node has failed, a new view
42 // has been formed, but the sync hasn't completed), another failure
43 // could happen, which complicates a view change.  During syncing the
44 // primary or backups can timeout, and initiate another Paxos round.
45 // There are 2 variables that RSM uses to keep track in what state it
46 // is:
47 //    - inviewchange: a node has failed and the RSM is performing a view change
48 //    - insync: this node is syncing its state
49 //
50 // If inviewchange is false and a node is the primary, then it can
51 // process client requests. If it is true, clients are told to retry
52 // later again.  While inviewchange is true, the RSM may go through several
53 // member list changes, one by one.   After a member list
54 // change completes, the nodes tries to sync. If the sync complets,
55 // the view change completes (and inviewchange is set to false).  If
56 // the sync fails, the node may start another member list change
57 // (inviewchange = true and insync = false).
58 //
59 // The implementation should be used only with servers that run all
60 // requests run to completion; in particular, a request shouldn't
61 // block.  If a request blocks, the backup won't respond to the
62 // primary, and the primary won't execute the request.  A request may
63 // send an RPC to another host, but the RPC should be a one-way
64 // message to that host; the backup shouldn't do anything based on the
65 // response or execute after the response, because it is not
66 // guaranteed that all backup will receive the same response and
67 // execute in the same order.
68 //
69 // The implementation can be viewed as a layered system:
70 //       RSM module     ---- in charge of replication
71 //       config module  ---- in charge of view management
72 //       Paxos module   ---- in charge of running Paxos to agree on a value
73 //
74 // Each module has threads and internal locks. Furthermore, a thread
75 // may call down through the layers (e.g., to run Paxos's proposer).
76 // When Paxos's acceptor accepts a new value for an instance, a thread
77 // will invoke an upcall to inform higher layers of the new value.
78 // The rule is that a module releases its internal locks before it
79 // upcalls, but can keep its locks when calling down.
80
81 #include "rsm.h"
82 #include "handle.h"
83 #include "rsm_client.h"
84 #include <unistd.h>
85
86 rsm::rsm(const string & _first, const string & _me) : primary(_first)
87 {
88     cfg = unique_ptr<config>(new config(_first, _me, this));
89
90     if (_first == _me) {
91         // Commit the first view here. We can not have acceptor::acceptor
92         // do the commit, since at that time this->cfg is not initialized
93         commit_change(1);
94     }
95     rsmrpc = cfg->get_rpcs();
96     rsmrpc->reg(rsm_client_protocol::invoke, &rsm::client_invoke, this);
97     rsmrpc->reg(rsm_client_protocol::members, &rsm::client_members, this);
98     rsmrpc->reg(rsm_protocol::invoke, &rsm::invoke, this);
99     rsmrpc->reg(rsm_protocol::transferreq, &rsm::transferreq, this);
100     rsmrpc->reg(rsm_protocol::transferdonereq, &rsm::transferdonereq, this);
101     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
102
103     // tester must be on different port, otherwise it may partition itself
104     testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
105     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
106     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
107 }
108
109 void rsm::start() {
110     lock ml(rsm_mutex);
111     rsmrpc->start();
112     testsvr->start();
113     thread(&rsm::recovery, this).detach();
114 }
115
116 // The recovery thread runs this function
117 void rsm::recovery() {
118     bool r = true;
119     lock ml(rsm_mutex);
120
121     while (1) {
122         while (!cfg->ismember(cfg->myaddr(), vid_commit)) {
123             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
124             // cfg->view_id in this manner involves a race.  I suspect not.
125             if (join(primary, ml)) {
126                 LOG("joined");
127                 commit_change(cfg->view_id(), ml);
128             } else {
129                 ml.unlock();
130                 this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary?
131                 ml.lock();
132             }
133         }
134         vid_insync = vid_commit;
135         LOG("sync vid_insync " << vid_insync);
136         if (primary == cfg->myaddr()) {
137             r = sync_with_backups(ml);
138         } else {
139             r = sync_with_primary(ml);
140         }
141         LOG("sync done");
142
143         // If there was a commited viewchange during the synchronization, restart
144         // the recovery
145         if (vid_insync != vid_commit)
146             continue;
147
148         if (r) {
149             myvs.vid = vid_commit;
150             myvs.seqno = 1;
151             inviewchange = false;
152         }
153         LOG("go to sleep " << insync << " " << inviewchange);
154         recovery_cond.wait(ml);
155     }
156 }
157
158 bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
159     rsm_mutex_lock.unlock();
160     {
161         // Make sure that the state of lock_server is stable during
162         // synchronization; otherwise, the primary's state may be more recent
163         // than replicas after the synchronization.
164         lock invoke_mutex_lock(invoke_mutex);
165         // By acquiring and releasing the invoke_mutex once, we make sure that
166         // the state of lock_server will not be changed until all
167         // replicas are synchronized. The reason is that client_invoke arrives
168         // after this point of time will see inviewchange == true, and returns
169         // BUSY.
170     }
171     rsm_mutex_lock.lock();
172     // Start accepting synchronization request (statetransferreq) now!
173     insync = true;
174     cfg->get_view(vid_insync, backups);
175     backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
176     LOG("backups " << backups);
177     sync_cond.wait(rsm_mutex_lock);
178     insync = false;
179     return true;
180 }
181
182
183 bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
184     // Remember the primary of vid_insync
185     string m = primary;
186     while (vid_insync == vid_commit) {
187         if (statetransfer(m, rsm_mutex_lock))
188             break;
189     }
190     return statetransferdone(m, rsm_mutex_lock);
191 }
192
193
194 //
195 // Call to transfer state from m to the local node.
196 // Assumes that rsm_mutex is already held.
197 //
198 bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
199 {
200     rsm_protocol::transferres r;
201     handle h(m);
202     int ret = 0;
203     LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
204     rpcc *cl;
205     {
206         rsm_mutex_lock.unlock();
207         cl = h.safebind();
208         if (cl) {
209             ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
210                     r, cfg->myaddr(), last_myvs, vid_insync);
211         }
212         rsm_mutex_lock.lock();
213     }
214     if (cl == 0 || ret != rsm_protocol::OK) {
215         LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
216         return false;
217     }
218     if (stf && last_myvs != r.last) {
219         stf->unmarshal_state(r.state);
220     }
221     last_myvs = r.last;
222     LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
223     return true;
224 }
225
226 bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
227     rsm_mutex_lock.unlock();
228     handle h(m);
229     rpcc *cl = h.safebind();
230     bool done = false;
231     if (cl) {
232         int r;
233         auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
234         done = (ret == rsm_protocol::OK);
235     }
236     rsm_mutex_lock.lock();
237     return done;
238 }
239
240
241 bool rsm::join(const string & m, lock & rsm_mutex_lock) {
242     handle h(m);
243     int ret = 0;
244     string log;
245
246     LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
247     rpcc *cl;
248     {
249         rsm_mutex_lock.unlock();
250         cl = h.safebind();
251         if (cl != 0) {
252             ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
253                     cfg->myaddr(), last_myvs);
254         }
255         rsm_mutex_lock.lock();
256     }
257
258     if (cl == 0 || ret != rsm_protocol::OK) {
259         LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
260         return false;
261     }
262     LOG("succeeded " << log);
263     cfg->restore(log);
264     return true;
265 }
266
267 //
268 // Config informs rsm whenever it has successfully
269 // completed a view change
270 //
271 void rsm::commit_change(unsigned vid) {
272     lock ml(rsm_mutex);
273     commit_change(vid, ml);
274     if (cfg->ismember(cfg->myaddr(), vid_commit))
275         breakpoint(2);
276 }
277
278 void rsm::commit_change(unsigned vid, lock &) {
279     if (vid <= vid_commit)
280         return;
281     LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
282             last_myvs.seqno << ") " << primary << " insync " << insync);
283     vid_commit = vid;
284     inviewchange = true;
285     set_primary(vid);
286     recovery_cond.notify_one();
287     sync_cond.notify_one();
288     if (cfg->ismember(cfg->myaddr(), vid_commit))
289         breakpoint(2);
290 }
291
292
293 void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
294     LOG("execute");
295     handler *h = procs[procno];
296     VERIFY(h);
297     marshall rep;
298     auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
299     r = marshall(ret, rep.content()).content();
300 }
301
302 //
303 // Clients call client_invoke to invoke a procedure on the replicated state
304 // machine: the primary receives the request, assigns it a sequence
305 // number, and invokes it on all members of the replicated state
306 // machine.
307 //
308 rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
309     LOG("invoke procno 0x" << hex << procno);
310     lock ml(invoke_mutex);
311     vector<string> m;
312     string myaddr;
313     viewstamp vs;
314     {
315         lock ml2(rsm_mutex);
316         LOG("Checking for inviewchange");
317         if (inviewchange)
318             return rsm_client_protocol::BUSY;
319         LOG("Checking for primacy");
320         myaddr = cfg->myaddr();
321         if (primary != myaddr)
322             return rsm_client_protocol::NOTPRIMARY;
323         LOG("Assigning a viewstamp");
324         cfg->get_view(vid_commit, m);
325         // assign the RPC the next viewstamp number
326         vs = myvs;
327         myvs++;
328     }
329
330     // send an invoke RPC to all slaves in the current view with a timeout of 1 second
331     LOG("Invoking slaves");
332     for (unsigned i  = 0; i < m.size(); i++) {
333         if (m[i] != myaddr) {
334             // if invoke on slave fails, return rsm_client_protocol::BUSY
335             handle h(m[i]);
336             LOG("Sending invoke to " << m[i]);
337             rpcc *cl = h.safebind();
338             if (!cl)
339                 return rsm_client_protocol::BUSY;
340             int ignored_rval;
341             auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
342             LOG("Invoke returned " << ret);
343             if (ret != rsm_protocol::OK)
344                 return rsm_client_protocol::BUSY;
345             breakpoint(1);
346             lock rsm_mutex_lock(rsm_mutex);
347             partition1(rsm_mutex_lock);
348         }
349     }
350     execute(procno, req, r);
351     for (size_t i=0; i<r.size(); i++) {
352         LOG(hex << setfill('0') << setw(2) << (unsigned int)(unsigned char)r[i]);
353     }
354     last_myvs = vs;
355     return rsm_client_protocol::OK;
356 }
357
358 //
359 // The primary calls the internal invoke at each member of the
360 // replicated state machine
361 //
362 // the replica must execute requests in order (with no gaps)
363 // according to requests' seqno
364
365 rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
366     LOG("invoke procno 0x" << hex << proc);
367     lock ml(invoke_mutex);
368     vector<string> m;
369     string myaddr;
370     {
371         lock ml2(rsm_mutex);
372         // check if !inviewchange
373         LOG("Checking for view change");
374         if (inviewchange)
375             return rsm_protocol::ERR;
376         // check if slave
377         LOG("Checking for slave status");
378         myaddr = cfg->myaddr();
379         if (primary == myaddr)
380             return rsm_protocol::ERR;
381         cfg->get_view(vid_commit, m);
382         if (find(m.begin(), m.end(), myaddr) == m.end())
383             return rsm_protocol::ERR;
384         // check sequence number
385         LOG("Checking sequence number");
386         if (vs != myvs)
387             return rsm_protocol::ERR;
388         myvs++;
389     }
390     string r;
391     execute(proc, req, r);
392     last_myvs = vs;
393     breakpoint(1);
394     return rsm_protocol::OK;
395 }
396
397 //
398 // RPC handler: Send back the local node's state to the caller
399 //
400 rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
401         viewstamp last, unsigned vid) {
402     lock ml(rsm_mutex);
403     LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
404             last_myvs.vid << "," << last_myvs.seqno << ")");
405     if (!insync || vid != vid_insync)
406         return rsm_protocol::BUSY;
407     if (stf && last != last_myvs)
408         r.state = stf->marshal_state();
409     r.last = last_myvs;
410     return rsm_protocol::OK;
411 }
412
413 //
414 // RPC handler: Inform the local node (the primary) that node m has synchronized
415 // for view vid
416 //
417 rsm_protocol::status rsm::transferdonereq(int &, const string & m, unsigned vid) {
418     lock ml(rsm_mutex);
419     if (!insync || vid != vid_insync)
420         return rsm_protocol::BUSY;
421     backups.erase(find(backups.begin(), backups.end(), m));
422     if (backups.empty())
423         sync_cond.notify_one();
424     return rsm_protocol::OK;
425 }
426
427 // a node that wants to join an RSM as a server sends a
428 // joinreq to the RSM's current primary; this is the
429 // handler for that RPC.
430 rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last) {
431     auto ret = rsm_protocol::OK;
432
433     lock ml(rsm_mutex);
434     LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
435             last_myvs.vid << "," << last_myvs.seqno << ")");
436     if (cfg->ismember(m, vid_commit)) {
437         LOG(m << " is still a member -- nothing to do");
438         log = cfg->dump();
439     } else if (cfg->myaddr() != primary) {
440         LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
441         ret = rsm_protocol::BUSY;
442     } else {
443         // We cache vid_commit to avoid adding m to a view which already contains
444         // m due to race condition
445         LOG("calling down to config layer");
446         unsigned vid_cache = vid_commit;
447         bool succ;
448         {
449             ml.unlock();
450             succ = cfg->add(m, vid_cache);
451             ml.lock();
452         }
453         if (cfg->ismember(m, cfg->view_id())) {
454             log = cfg->dump();
455             LOG("ret " << ret << " log " << log);
456         } else {
457             LOG("failed; proposer couldn't add " << succ);
458             ret = rsm_protocol::BUSY;
459         }
460     }
461     return ret;
462 }
463
464 //
465 // RPC handler: Responds with the list of known nodes for fall-back on a
466 // primary failure
467 //
468 rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
469     vector<string> m;
470     lock ml(rsm_mutex);
471     cfg->get_view(vid_commit, m);
472     m.push_back(primary);
473     r = m;
474     LOG("return " << m << " m " << primary);
475     return rsm_client_protocol::OK;
476 }
477
478 // if primary is member of new view, that node is primary
479 // otherwise, the lowest number node of the previous view.
480 // caller should hold rsm_mutex
481 void rsm::set_primary(unsigned vid) {
482     vector<string> c, p;
483     cfg->get_view(vid, c);
484     cfg->get_view(vid - 1, p);
485     VERIFY (c.size() > 0);
486
487     if (isamember(primary,c)) {
488         LOG("primary stays " << primary);
489         return;
490     }
491
492     VERIFY(p.size() > 0);
493     for (unsigned i = 0; i < p.size(); i++) {
494         if (isamember(p[i], c)) {
495             primary = p[i];
496             LOG("primary is " << primary);
497             return;
498         }
499     }
500     VERIFY(0);
501 }
502
503 bool rsm::amiprimary() {
504     lock ml(rsm_mutex);
505     return primary == cfg->myaddr() && !inviewchange;
506 }
507
508
509 // Test RPCs -- simulate partitions and failures
510
511 void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
512     VERIFY(rsm_mutex_lock);
513     vector<string> m;
514     cfg->get_view(vid_commit, m);
515     for (unsigned i  = 0; i < m.size(); i++) {
516         if (m[i] != cfg->myaddr()) {
517             handle h(m[i]);
518             LOG("member " << m[i] << " " << heal);
519             if (h.safebind()) h.safebind()->set_reachable(heal);
520         }
521     }
522     rsmrpc->set_reachable(heal);
523 }
524
525 rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
526     lock ml(rsm_mutex);
527     LOG("heal " << heal << " (dopartition " <<
528             dopartition << ", partitioned " << partitioned << ")");
529     if (heal)
530         net_repair(heal, ml);
531     else
532         dopartition = true;
533     partitioned = false;
534     return r = rsm_test_protocol::OK;
535 }
536
537 // simulate failure at breakpoint 1 and 2
538
539 void rsm::breakpoint(int b) {
540     if (breakpoints[b-1]) {
541         LOG("Dying at breakpoint " << b << " in rsm!");
542         exit(1);
543     }
544 }
545
546 void rsm::partition1(lock & rsm_mutex_lock) {
547     if (dopartition) {
548         net_repair(false, rsm_mutex_lock);
549         dopartition = false;
550         partitioned = true;
551     }
552 }
553
554 rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
555     r = rsm_test_protocol::OK;
556     lock ml(rsm_mutex);
557     LOG("breakpoint " << b);
558     if (b == 1) breakpoints[1-1] = true;
559     else if (b == 2) breakpoints[2-1] = true;
560     else if (b == 3 || b == 4) cfg->breakpoint(b);
561     else r = rsm_test_protocol::ERR;
562     return r;
563 }