Cosmetic improvements.
[invirt/third/libt4.git] / rsm.h
1 // replicated state machine interface.
2
3 #ifndef rsm_h
4 #define rsm_h
5
6 #include "types.h"
7 #include "rsm_protocol.h"
8 #include "rpc/rpc.h"
9 #include <arpa/inet.h>
10 #include "config.h"
11
12 class rsm_state_transfer {
13     public:
14         virtual string marshal_state() = 0;
15         virtual void unmarshal_state(const string &) = 0;
16         virtual ~rsm_state_transfer() {}
17 };
18
19 class rsm : public config_view_change {
20     protected:
21         map<rpc_protocol::proc_id_t, handler *> procs;
22         unique_ptr<config> cfg;
23         rsm_state_transfer *stf = nullptr;
24         rpcs *rsmrpc;
25         // On slave: expected viewstamp of next invoke request
26         // On primary: viewstamp for the next request from rsm_client
27         viewstamp last_myvs{0, 0};   // Viewstamp of the last executed request
28         viewstamp myvs{0, 1};
29         string primary;
30         bool insync = false;
31         bool inviewchange = true;
32         unsigned vid_commit = 0;  // Latest view id that is known to rsm layer
33         unsigned vid_insync;  // The view id that this node is synchronizing for
34         vector<string> backups;   // A list of unsynchronized backups
35
36         // For testing purposes
37         unique_ptr<rpcs> testsvr;
38         bool partitioned = false;
39         bool dopartition = false;
40         bool breakpoints[2] = {};
41
42         rsm_client_protocol::status client_members(vector<string> & r, int i);
43         rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq);
44         rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src,
45                 viewstamp last, unsigned vid);
46         rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
47         rsm_protocol::status joinreq(string & log, const string & src, viewstamp last);
48         rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal);
49         rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b);
50
51         mutex rsm_mutex, invoke_mutex;
52         cond recovery_cond, sync_cond;
53
54         void execute(rpc_protocol::proc_id_t procno, const string & req, string & r);
55         rsm_client_protocol::status client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req);
56         bool statetransfer(const string & m, lock & rsm_mutex_lock);
57         bool statetransferdone(const string & m, lock & rsm_mutex_lock);
58         bool join(const string & m, lock & rsm_mutex_lock);
59         void set_primary(unsigned vid);
60         bool sync_with_backups(lock & rsm_mutex_lock);
61         bool sync_with_primary(lock & rsm_mutex_lock);
62         void net_repair(bool heal, lock & rsm_mutex_lock);
63         void breakpoint(int b);
64         void partition1(lock & rsm_mutex_lock);
65         void commit_change(unsigned vid, lock & rsm_mutex_lock);
66         void recovery NORETURN ();
67     public:
68         rsm (const string & _first, const string & _me);
69
70         bool amiprimary();
71         void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
72         void commit_change(unsigned vid);
73
74         template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
75             static_assert(is_valid_registration<P, F>::value, "RSM handler registered with incorrect argument types");
76             lock ml(rsm_mutex);
77             procs[proc.id] = marshalled_func<F>::wrap(f, c);
78         }
79
80         void start();
81 };
82
83 #endif