X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5a5c578e2e358a121cdb9234a6cb11c4ecfbf323..ab6c1548ac2b1907bca92c8ce43e919c1a649a6f:/rsm.h diff --git a/rsm.h b/rsm.h index f2eb5bd..dfbb25c 100644 --- a/rsm.h +++ b/rsm.h @@ -12,77 +12,72 @@ class rsm_state_transfer { public: virtual string marshal_state() = 0; - virtual void unmarshal_state(string) = 0; - virtual ~rsm_state_transfer() {} + virtual void unmarshal_state(const string &) = 0; + virtual ~rsm_state_transfer(); }; class rsm : public config_view_change { - private: - void reg1(int proc, handler *); protected: - map procs; - config *cfg; - class rsm_state_transfer *stf; + std::map procs; + unique_ptr cfg; + rsm_state_transfer *stf = nullptr; rpcs *rsmrpc; // On slave: expected viewstamp of next invoke request // On primary: viewstamp for the next request from rsm_client - viewstamp myvs; - viewstamp last_myvs; // Viewstamp of the last executed request + viewstamp last_myvs{0, 0}; // Viewstamp of the last executed request + viewstamp myvs{0, 1}; string primary; - bool insync; - bool inviewchange; - unsigned vid_commit; // Latest view id that is known to rsm layer + bool insync = false; + bool inviewchange = true; + unsigned vid_commit = 0; // Latest view id that is known to rsm layer unsigned vid_insync; // The view id that this node is synchronizing for - vector backups; // A list of unsynchronized backups + std::vector backups; // A list of unsynchronized backups // For testing purposes - rpcs *testsvr; - bool partitioned; - bool dopartition; - bool break1; - bool break2; + unique_ptr testsvr; + bool partitioned = false; + bool dopartition = false; + bool breakpoints[2] = {}; - rsm_client_protocol::status client_members(vector &r, int i); - rsm_protocol::status invoke(int &, int proc, viewstamp vs, string mreq); - rsm_protocol::status transferreq(rsm_protocol::transferres &r, string src, + rsm_client_protocol::status client_members(std::vector & r, int i); + rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq); + rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid); - rsm_protocol::status transferdonereq(int &, string m, unsigned vid); - rsm_protocol::status joinreq(string & log, string src, - viewstamp last); - rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal); - rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b); + rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid); + rsm_protocol::status joinreq(string & log, const string & src, viewstamp last); + rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal); + rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b); - mutex rsm_mutex, invoke_mutex; + std::mutex rsm_mutex, invoke_mutex; cond recovery_cond, sync_cond; - void execute(int procno, string req, string &r); - rsm_client_protocol::status client_invoke(string &r, int procno, string req); - bool statetransfer(string m, lock & rsm_mutex_lock); - bool statetransferdone(string m, lock & rsm_mutex_lock); - bool join(string m, lock & rsm_mutex_lock); + void execute(rpc_protocol::proc_id_t procno, const string & req, string & r); + rsm_client_protocol::status client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req); + bool statetransfer(const string & m, lock & rsm_mutex_lock); + bool statetransferdone(const string & m, lock & rsm_mutex_lock); + bool join(const string & m, lock & rsm_mutex_lock); void set_primary(unsigned vid); - string find_highest(viewstamp &vs, string &m, unsigned &vid); bool sync_with_backups(lock & rsm_mutex_lock); bool sync_with_primary(lock & rsm_mutex_lock); void net_repair(bool heal, lock & rsm_mutex_lock); - void breakpoint1(); - void breakpoint2(); + void breakpoint(int b); void partition1(lock & rsm_mutex_lock); void commit_change(unsigned vid, lock & rsm_mutex_lock); + void recovery NORETURN (); public: - rsm (string _first, string _me); - ~rsm() {} + rsm (const string & _first, const string & _me); bool amiprimary(); void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; } - void recovery(); void commit_change(unsigned vid); - template void reg(int proc, F f, C *c=nullptr); -}; + template void reg(rpc_protocol::proc_t

proc, F f, C *c=nullptr) { + static_assert(is_valid_registration::value, "RSM handler registered with incorrect argument types"); + lock ml(rsm_mutex); + procs[proc.id] = marshalled_func::wrap(f, c); + } -template void rsm::reg(int proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); -} + void start(); +}; -#endif /* rsm_h */ +#endif