Imported from 6.824 labs
[invirt/third/libt4.git] / lock_client_cache_rsm.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks
2 // see lock_client.cache.h for protocol details.
3
4 #include "lock_client_cache_rsm.h"
5 #include "rpc.h"
6 #include <sstream>
7 #include <iostream>
8 #include <stdio.h>
9 #include "tprintf.h"
10
11 #include "rsm_client.h"
12
13 lock_state::lock_state():
14     state(none)
15 {
16 }
17
18 void lock_state::wait() {
19     pthread_t self = pthread_self();
20     c[self].wait(m);
21     c.erase(self);
22 }
23
24 void lock_state::signal() {
25     // signal anyone
26     if (c.begin() != c.end())
27         c.begin()->second.signal();
28 }
29
30 void lock_state::signal(pthread_t who) {
31     if (c.count(who))
32         c[who].signal();
33 }
34
35 static void * releasethread(void *x) {
36     lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
37     cc->releaser();
38     return 0;
39 }
40
41 int lock_client_cache_rsm::last_port = 0;
42
43 lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
44     ScopedLock sl(lock_table_lock);
45     // by the semantics of std::map, this will create
46     // the lock if it doesn't already exist
47     return lock_table[lid];
48 }
49
50 lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
51     srand(time(NULL)^last_port);
52     rlock_port = ((rand()%32000) | (0x1 << 10));
53     const char *hname;
54     // VERIFY(gethostname(hname, 100) == 0);
55     hname = "127.0.0.1";
56     ostringstream host;
57     host << hname << ":" << rlock_port;
58     id = host.str();
59     last_port = rlock_port;
60     rpcs *rlsrpc = new rpcs(rlock_port);
61     rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
62     rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
63     {
64         ScopedLock sl(xid_mutex);
65         xid = 0;
66     }
67     rsmc = new rsm_client(xdst);
68     int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
69     VERIFY (r == 0);
70 }
71
72 void lock_client_cache_rsm::releaser() {
73     while (1) {
74         lock_protocol::lockid_t lid;
75         release_fifo.deq(&lid);
76         LOG("Releaser: " << lid);
77
78         lock_state &st = get_lock_state(lid);
79         ScopedLock sl(st.m);
80         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
81         st.state = lock_state::releasing;
82         {
83             ScopedUnlock su(st.m);
84             int r;
85             rsmc->call(lock_protocol::release, lid, id, st.xid, r);
86         }
87         st.state = lock_state::none;
88         LOG("Lock " << lid << ": none");
89         st.signal();
90     }
91 }
92
93 lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
94     lock_state &st = get_lock_state(lid);
95     ScopedLock sl(st.m);
96     pthread_t self = pthread_self();
97
98     // check for reentrancy
99     VERIFY(st.state != lock_state::locked || st.held_by != self);
100     VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
101
102     st.wanted_by.push_back(self);
103
104     while (1) {
105         if (st.state != lock_state::free)
106             LOG("Lock " << lid << ": not free");
107
108         if (st.state == lock_state::none || st.state == lock_state::retrying) {
109             if (st.state == lock_state::none) {
110                 ScopedLock sl(xid_mutex);
111                 st.xid = xid++;
112             }
113             st.state = lock_state::acquiring;
114             LOG("Lock " << lid << ": acquiring");
115             lock_protocol::status result;
116             {
117                 ScopedUnlock su(st.m);
118                 int r;
119                 result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
120             }
121             LOG("acquire returned " << result);
122             if (result == lock_protocol::OK) {
123                 st.state = lock_state::free;
124                 LOG("Lock " << lid << ": free");
125             }
126         }
127
128         VERIFY(st.wanted_by.size() != 0);
129         if (st.state == lock_state::free) {
130             // is it for me?
131             pthread_t front = st.wanted_by.front();
132             if (front == releaser_thread) {
133                 st.wanted_by.pop_front();
134                 st.state = lock_state::locked;
135                 st.held_by = releaser_thread;
136                 LOG("Queuing " << lid << " for release");
137                 release_fifo.enq(lid);
138             } else if (front == self) {
139                 st.wanted_by.pop_front();
140                 st.state = lock_state::locked;
141                 st.held_by = self;
142                 break;
143             } else {
144                 st.signal(front);
145             }
146         }
147
148         LOG("waiting...");
149         st.wait();
150         LOG("wait ended");
151     }
152
153     LOG("Lock " << lid << ": locked");
154     return lock_protocol::OK;
155 }
156
157 lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
158     lock_state &st = get_lock_state(lid);
159     ScopedLock sl(st.m);
160     pthread_t self = pthread_self();
161     VERIFY(st.state == lock_state::locked && st.held_by == self);
162     st.state = lock_state::free;
163     LOG("Lock " << lid << ": free");
164     if (st.wanted_by.size()) {
165         pthread_t front = st.wanted_by.front();
166         if (front == releaser_thread) {
167             st.state = lock_state::locked;
168             st.held_by = releaser_thread;
169             st.wanted_by.pop_front();
170             LOG("Queuing " << lid << " for release");
171             release_fifo.enq(lid);
172         } else
173             st.signal(front);
174     }
175     LOG("Finished signaling.");
176     return lock_protocol::OK;
177 }
178
179 rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
180     LOG("Revoke handler " << lid << " " << xid);
181     lock_state &st = get_lock_state(lid);
182     ScopedLock sl(st.m);
183
184     if (st.state == lock_state::releasing || st.state == lock_state::none)
185         return rlock_protocol::OK;
186
187     if (st.state == lock_state::free &&
188         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
189         // gimme
190         st.state = lock_state::locked;
191         st.held_by = releaser_thread;
192         if (st.wanted_by.size())
193             st.wanted_by.pop_front();
194         release_fifo.enq(lid);
195     } else {
196         // get in line
197         st.wanted_by.push_back(releaser_thread);
198     }
199     return rlock_protocol::OK;
200 }
201
202 rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
203     lock_state &st = get_lock_state(lid);
204     ScopedLock sl(st.m);
205     VERIFY(st.state == lock_state::acquiring);
206     st.state = lock_state::retrying;
207     LOG("Lock " << lid << ": none");
208     st.signal(); // only one thread needs to wake up
209     return rlock_protocol::OK;
210 }