All sleep calls via std::this_thread
[invirt/third/libt4.git] / config.cc
1 #include "config.h"
2
3 using std::vector;
4
5 // The config module maintains views. As a node joins or leaves a
6 // view, the next view will be the same as previous view, except with
7 // the new node added or removed. The first view contains only node
8 // 1. If node 2 joins after the first node (it will download the views
9 // from node 1), it will learn about view 1 with the first node as the
10 // only member.  It will then invoke Paxos to create the next view.
11 // It will tell Paxos to ask the nodes in view 1 to agree on the value
12 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
13 // {1,2} as the members. When node 3 joins, the config module runs
14 // Paxos with the nodes in view 2 and the proposed value to be
15 // {1,2,3}. And so on.  When a node discovers that some node of the
16 // current view is not responding, it kicks off Paxos to propose a new
17 // value (the current view minus the node that isn't responding). The
18 // config module uses Paxos to create a total order of views, and it
19 // is ensured that the majority of the previous view agrees to the
20 // next view.  The Paxos log contains all the values (i.e., views)
21 // agreed on.
22 //
23 // The RSM module informs config to add nodes. The config module
24 // runs a heartbeater thread that checks in with nodes.  If a node
25 // doesn't respond, the config module will invoke Paxos's proposer to
26 // remove the node.  Higher layers will learn about this change when a
27 // Paxos acceptor accepts the new proposed value through
28 // paxos_commit().
29 //
30 // To be able to bring other nodes up to date to the latest formed
31 // view, each node will have a complete history of all view numbers
32 // and their values that it knows about. At any time a node can reboot
33 // and when it re-joins, it may be many views behind; by remembering
34 // all views, the other nodes can bring this re-joined node up to
35 // date.
36
37 config_view_change::~config_view_change() {}
38
39 config::config(const string & _first, const string & _me, config_view_change *_vc)
40     : my_view_id(0), first(_first), me(_me), vc(_vc),
41       paxos(this, me == _first, me, me)
42 {
43     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
44     lock cfg_mutex_lock(cfg_mutex);
45     reconstruct(cfg_mutex_lock);
46     thread(&config::heartbeater, this).detach();
47 }
48
49 void config::restore(const string & s) {
50     lock cfg_mutex_lock(cfg_mutex);
51     paxos.restore(s);
52     reconstruct(cfg_mutex_lock);
53 }
54
55 void config::get_view(unsigned instance, vector<string> & m) {
56     lock cfg_mutex_lock(cfg_mutex);
57     get_view(instance, m, cfg_mutex_lock);
58 }
59
60 void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
61     VERIFY(cfg_mutex_lock);
62     string value = paxos.value(instance);
63     LOG << "get_view(" << instance << "): returns " << value;
64     m = explode(value);
65 }
66
67 void config::reconstruct(lock & cfg_mutex_lock) {
68     VERIFY(cfg_mutex_lock);
69     my_view_id = paxos.instance();
70     if (my_view_id > 0) {
71         get_view(my_view_id, mems, cfg_mutex_lock);
72         LOG << "view " << my_view_id << " " << mems;
73     }
74 }
75
76 // Called by Paxos's acceptor.
77 void config::paxos_commit(unsigned instance, const string & value) {
78     lock cfg_mutex_lock(cfg_mutex);
79
80     vector<string> newmem = explode(value);
81     LOG << "instance " << instance << ": " << newmem;
82
83     for (auto mem : mems) {
84         LOG << "is " << mem << " still a member?";
85         if (!isamember(mem, newmem) && me != mem) {
86             LOG << "delete " << mem;
87             rpcc::unbind_cached(mem);
88         }
89     }
90
91     mems = newmem;
92     my_view_id = instance;
93     if (vc) {
94         cfg_mutex_lock.unlock();
95         vc->commit_change(instance);
96         cfg_mutex_lock.lock();
97     }
98 }
99
100 bool config::ismember(const string & m, unsigned vid) {
101     lock cfg_mutex_lock(cfg_mutex);
102     vector<string> v;
103     get_view(vid, v, cfg_mutex_lock);
104     return isamember(m, v);
105 }
106
107 bool config::add(const string & new_m, unsigned vid) {
108     lock cfg_mutex_lock(cfg_mutex);
109     LOG << "adding " << new_m << " to " << vid;
110     if (vid != my_view_id) {
111         LOG << "that's not my view id, " << my_view_id << "!";
112         return false;
113     }
114     LOG << "calling down to paxos layer";
115     vector<string> m(mems), cmems(mems);
116     m.push_back(new_m);
117     LOG << "old mems " << cmems << " " << implode(cmems);
118     LOG << "new mems " << m << " " << implode(m);
119     unsigned nextvid = my_view_id + 1;
120     cfg_mutex_lock.unlock();
121     bool r = paxos.run(nextvid, cmems, implode(m));
122     cfg_mutex_lock.lock();
123     LOG << "paxos proposer returned " << (r ? "success" : "failure");
124     return r;
125 }
126
127 // caller should hold cfg_mutex
128 bool config::remove(const string & m, lock & cfg_mutex_lock) {
129     VERIFY(cfg_mutex_lock);
130     LOG << "my_view_id " << my_view_id << " remove? " << m;
131     vector<string> n;
132     for (auto mem : mems) {
133         if (mem != m)
134             n.push_back(mem);
135     }
136     vector<string> cmems = mems;
137     unsigned nextvid = my_view_id + 1;
138     cfg_mutex_lock.unlock();
139     bool r = paxos.run(nextvid, cmems, implode(n));
140     cfg_mutex_lock.lock();
141     LOG << "proposer returned " << (r ? "success" : "failure");
142     return r;
143 }
144
145 void config::heartbeater() {
146     lock cfg_mutex_lock(cfg_mutex);
147
148     while (1) {
149         auto next_timeout = steady_clock::now() + milliseconds(300);
150         LOG << "go to sleep";
151         config_cond.wait_until(cfg_mutex_lock, next_timeout);
152
153         unsigned vid = my_view_id;
154         vector<string> cmems;
155         get_view(vid, cmems, cfg_mutex_lock);
156         LOG << "current membership " << cmems;
157
158         if (!isamember(me, cmems)) {
159             LOG << "not member yet; skip hearbeat";
160             continue;
161         }
162
163         // who has the smallest ID?
164         string m = std::min(me, *std::min_element(cmems.begin(), cmems.end()));
165
166         if (m == me) {
167             // ping the other nodes
168             for (string mem : cmems) {
169                 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
170                     continue;
171                 if (vid == my_view_id)
172                     remove(mem, cfg_mutex_lock);
173                 break;
174             }
175         } else {
176             // ping the node with the smallest ID
177             if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
178                 remove(m, cfg_mutex_lock);
179         }
180     }
181 }
182
183 paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
184     lock cfg_mutex_lock(cfg_mutex);
185     r = (int) my_view_id;
186     LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
187     if (vid == my_view_id)
188         return paxos_protocol::OK;
189     else if (paxos.isrunning()) {
190         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
191         return paxos_protocol::OK;
192     }
193     return paxos_protocol::ERR;
194 }
195
196 config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
197     VERIFY(cfg_mutex_lock);
198     unsigned vid = my_view_id;
199     LOG << "heartbeat to " << m << " (" << vid << ")";
200
201     cfg_mutex_lock.unlock();
202     int r = 0, ret = rpc_protocol::bind_failure;
203     if (auto cl = rpcc::bind_cached(m))
204         ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
205     cfg_mutex_lock.lock();
206
207     heartbeat_t res = OK;
208     switch (ret) {
209         case paxos_protocol::OK:
210             break;
211         case rpc_protocol::atmostonce_failure:
212         case rpc_protocol::oldsrv_failure:
213             rpcc::unbind_cached(m);
214             break;
215         default:
216             LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
217             res = (ret < 0) ? FAILURE : VIEWERR;
218     }
219     LOG << "done " << res;
220     return res;
221 }