Variadic templates for RPCs
[invirt/third/libt4.git] / lock_client_cache_rsm.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks
2 // see lock_client.cache.h for protocol details.
3
4 #include "lock_client_cache_rsm.h"
5 #include "rpc/rpc.h"
6 #include <sstream>
7 #include <iostream>
8 #include <algorithm>
9 #include <stdio.h>
10 #include "tprintf.h"
11
12 #include "rsm_client.h"
13 #include "lock.h"
14
15 using std::ostringstream;
16
17 lock_state::lock_state():
18     state(none)
19 {
20 }
21
22 void lock_state::wait() {
23     auto self = std::this_thread::get_id();
24     {
25         adopt_lock ml(m);
26         c[self].wait(ml);
27     }
28     c.erase(self);
29 }
30
31 void lock_state::signal() {
32     // signal anyone
33     if (c.begin() != c.end())
34         c.begin()->second.notify_one();
35 }
36
37 void lock_state::signal(std::thread::id who) {
38     if (c.count(who))
39         c[who].notify_one();
40 }
41
42 int lock_client_cache_rsm::last_port = 0;
43
44 lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
45     lock sl(lock_table_lock);
46     // by the semantics of std::map, this will create
47     // the lock if it doesn't already exist
48     return lock_table[lid];
49 }
50
51 lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
52     srand(time(NULL)^last_port);
53     rlock_port = ((rand()%32000) | (0x1 << 10));
54     const char *hname;
55     // VERIFY(gethostname(hname, 100) == 0);
56     hname = "127.0.0.1";
57     ostringstream host;
58     host << hname << ":" << rlock_port;
59     id = host.str();
60     last_port = rlock_port;
61     rpcs *rlsrpc = new rpcs(rlock_port);
62     rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
63     rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
64     {
65         lock sl(xid_mutex);
66         xid = 0;
67     }
68     rsmc = new rsm_client(xdst);
69     releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this);
70 }
71
72 void lock_client_cache_rsm::releaser() {
73     while (1) {
74         lock_protocol::lockid_t lid;
75         release_fifo.deq(&lid);
76         LOG("Releaser: " << lid);
77
78         lock_state &st = get_lock_state(lid);
79         lock sl(st.m);
80         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
81         st.state = lock_state::releasing;
82         {
83             sl.unlock();
84             int r;
85             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
86             sl.lock();
87         }
88         st.state = lock_state::none;
89         LOG("Lock " << lid << ": none");
90         st.signal();
91     }
92 }
93
94 lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
95     lock_state &st = get_lock_state(lid);
96     lock sl(st.m);
97     auto self = std::this_thread::get_id();
98
99     // check for reentrancy
100     VERIFY(st.state != lock_state::locked || st.held_by != self);
101     VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
102
103     st.wanted_by.push_back(self);
104
105     while (1) {
106         if (st.state != lock_state::free)
107             LOG("Lock " << lid << ": not free");
108
109         if (st.state == lock_state::none || st.state == lock_state::retrying) {
110             if (st.state == lock_state::none) {
111                 lock sl(xid_mutex);
112                 st.xid = xid++;
113             }
114             st.state = lock_state::acquiring;
115             LOG("Lock " << lid << ": acquiring");
116             lock_protocol::status result;
117             {
118                 sl.unlock();
119                 int r;
120                 result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
121                 sl.lock();
122             }
123             LOG("acquire returned " << result);
124             if (result == lock_protocol::OK) {
125                 st.state = lock_state::free;
126                 LOG("Lock " << lid << ": free");
127             }
128         }
129
130         VERIFY(st.wanted_by.size() != 0);
131         if (st.state == lock_state::free) {
132             // is it for me?
133             auto front = st.wanted_by.front();
134             if (front == releaser_thread.get_id()) {
135                 st.wanted_by.pop_front();
136                 st.state = lock_state::locked;
137                 st.held_by = releaser_thread.get_id();
138                 LOG("Queuing " << lid << " for release");
139                 release_fifo.enq(lid);
140             } else if (front == self) {
141                 st.wanted_by.pop_front();
142                 st.state = lock_state::locked;
143                 st.held_by = self;
144                 break;
145             } else {
146                 st.signal(front);
147             }
148         }
149
150         LOG("waiting...");
151         st.wait();
152         LOG("wait ended");
153     }
154
155     LOG("Lock " << lid << ": locked");
156     return lock_protocol::OK;
157 }
158
159 lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
160     lock_state &st = get_lock_state(lid);
161     lock sl(st.m);
162     auto self = std::this_thread::get_id();
163     VERIFY(st.state == lock_state::locked && st.held_by == self);
164     st.state = lock_state::free;
165     LOG("Lock " << lid << ": free");
166     if (st.wanted_by.size()) {
167         auto front = st.wanted_by.front();
168         if (front == releaser_thread.get_id()) {
169             st.state = lock_state::locked;
170             st.held_by = releaser_thread.get_id();
171             st.wanted_by.pop_front();
172             LOG("Queuing " << lid << " for release");
173             release_fifo.enq(lid);
174         } else
175             st.signal(front);
176     }
177     LOG("Finished signaling.");
178     return lock_protocol::OK;
179 }
180
181 rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
182     LOG("Revoke handler " << lid << " " << xid);
183     lock_state &st = get_lock_state(lid);
184     lock sl(st.m);
185
186     if (st.state == lock_state::releasing || st.state == lock_state::none)
187         return rlock_protocol::OK;
188
189     if (st.state == lock_state::free &&
190         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
191         // gimme
192         st.state = lock_state::locked;
193         st.held_by = releaser_thread.get_id();
194         if (st.wanted_by.size())
195             st.wanted_by.pop_front();
196         release_fifo.enq(lid);
197     } else {
198         // get in line
199         st.wanted_by.push_back(releaser_thread.get_id());
200     }
201     return rlock_protocol::OK;
202 }
203
204 rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
205     lock_state &st = get_lock_state(lid);
206     lock sl(st.m);
207     VERIFY(st.state == lock_state::acquiring);
208     st.state = lock_state::retrying;
209     LOG("Lock " << lid << ": none");
210     st.signal(); // only one thread needs to wake up
211     return rlock_protocol::OK;
212 }