Build on wheezy, and presumably precise
[invirt/third/libt4.git] / lock_client_cache_rsm.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks
2 // see lock_client.cache.h for protocol details.
3
4 #include "lock_client_cache_rsm.h"
5 #include "rpc.h"
6 #include <sstream>
7 #include <iostream>
8 #include <algorithm>
9 #include <stdio.h>
10 #include "tprintf.h"
11
12 #include "rsm_client.h"
13
14 lock_state::lock_state():
15     state(none)
16 {
17 }
18
19 void lock_state::wait() {
20     pthread_t self = pthread_self();
21     c[self].wait(m);
22     c.erase(self);
23 }
24
25 void lock_state::signal() {
26     // signal anyone
27     if (c.begin() != c.end())
28         c.begin()->second.signal();
29 }
30
31 void lock_state::signal(pthread_t who) {
32     if (c.count(who))
33         c[who].signal();
34 }
35
36 static void * releasethread(void *x) {
37     lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
38     cc->releaser();
39     return 0;
40 }
41
42 int lock_client_cache_rsm::last_port = 0;
43
44 lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
45     ScopedLock sl(lock_table_lock);
46     // by the semantics of std::map, this will create
47     // the lock if it doesn't already exist
48     return lock_table[lid];
49 }
50
51 lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
52     srand(time(NULL)^last_port);
53     rlock_port = ((rand()%32000) | (0x1 << 10));
54     const char *hname;
55     // VERIFY(gethostname(hname, 100) == 0);
56     hname = "127.0.0.1";
57     ostringstream host;
58     host << hname << ":" << rlock_port;
59     id = host.str();
60     last_port = rlock_port;
61     rpcs *rlsrpc = new rpcs(rlock_port);
62     rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
63     rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
64     {
65         ScopedLock sl(xid_mutex);
66         xid = 0;
67     }
68     rsmc = new rsm_client(xdst);
69     int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
70     VERIFY (r == 0);
71 }
72
73 void lock_client_cache_rsm::releaser() {
74     while (1) {
75         lock_protocol::lockid_t lid;
76         release_fifo.deq(&lid);
77         LOG("Releaser: " << lid);
78
79         lock_state &st = get_lock_state(lid);
80         ScopedLock sl(st.m);
81         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
82         st.state = lock_state::releasing;
83         {
84             ScopedUnlock su(st.m);
85             int r;
86             rsmc->call(lock_protocol::release, lid, id, st.xid, r);
87         }
88         st.state = lock_state::none;
89         LOG("Lock " << lid << ": none");
90         st.signal();
91     }
92 }
93
94 lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
95     lock_state &st = get_lock_state(lid);
96     ScopedLock sl(st.m);
97     pthread_t self = pthread_self();
98
99     // check for reentrancy
100     VERIFY(st.state != lock_state::locked || st.held_by != self);
101     VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
102
103     st.wanted_by.push_back(self);
104
105     while (1) {
106         if (st.state != lock_state::free)
107             LOG("Lock " << lid << ": not free");
108
109         if (st.state == lock_state::none || st.state == lock_state::retrying) {
110             if (st.state == lock_state::none) {
111                 ScopedLock sl(xid_mutex);
112                 st.xid = xid++;
113             }
114             st.state = lock_state::acquiring;
115             LOG("Lock " << lid << ": acquiring");
116             lock_protocol::status result;
117             {
118                 ScopedUnlock su(st.m);
119                 int r;
120                 result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
121             }
122             LOG("acquire returned " << result);
123             if (result == lock_protocol::OK) {
124                 st.state = lock_state::free;
125                 LOG("Lock " << lid << ": free");
126             }
127         }
128
129         VERIFY(st.wanted_by.size() != 0);
130         if (st.state == lock_state::free) {
131             // is it for me?
132             pthread_t front = st.wanted_by.front();
133             if (front == releaser_thread) {
134                 st.wanted_by.pop_front();
135                 st.state = lock_state::locked;
136                 st.held_by = releaser_thread;
137                 LOG("Queuing " << lid << " for release");
138                 release_fifo.enq(lid);
139             } else if (front == self) {
140                 st.wanted_by.pop_front();
141                 st.state = lock_state::locked;
142                 st.held_by = self;
143                 break;
144             } else {
145                 st.signal(front);
146             }
147         }
148
149         LOG("waiting...");
150         st.wait();
151         LOG("wait ended");
152     }
153
154     LOG("Lock " << lid << ": locked");
155     return lock_protocol::OK;
156 }
157
158 lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
159     lock_state &st = get_lock_state(lid);
160     ScopedLock sl(st.m);
161     pthread_t self = pthread_self();
162     VERIFY(st.state == lock_state::locked && st.held_by == self);
163     st.state = lock_state::free;
164     LOG("Lock " << lid << ": free");
165     if (st.wanted_by.size()) {
166         pthread_t front = st.wanted_by.front();
167         if (front == releaser_thread) {
168             st.state = lock_state::locked;
169             st.held_by = releaser_thread;
170             st.wanted_by.pop_front();
171             LOG("Queuing " << lid << " for release");
172             release_fifo.enq(lid);
173         } else
174             st.signal(front);
175     }
176     LOG("Finished signaling.");
177     return lock_protocol::OK;
178 }
179
180 rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
181     LOG("Revoke handler " << lid << " " << xid);
182     lock_state &st = get_lock_state(lid);
183     ScopedLock sl(st.m);
184
185     if (st.state == lock_state::releasing || st.state == lock_state::none)
186         return rlock_protocol::OK;
187
188     if (st.state == lock_state::free &&
189         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
190         // gimme
191         st.state = lock_state::locked;
192         st.held_by = releaser_thread;
193         if (st.wanted_by.size())
194             st.wanted_by.pop_front();
195         release_fifo.enq(lid);
196     } else {
197         // get in line
198         st.wanted_by.push_back(releaser_thread);
199     }
200     return rlock_protocol::OK;
201 }
202
203 rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
204     lock_state &st = get_lock_state(lid);
205     ScopedLock sl(st.m);
206     VERIFY(st.state == lock_state::acquiring);
207     st.state = lock_state::retrying;
208     LOG("Lock " << lid << ": none");
209     st.signal(); // only one thread needs to wake up
210     return rlock_protocol::OK;
211 }