Got rid of most using directives. Ported tests to python.
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <unistd.h>
5 #include <arpa/inet.h>
6 #include "handle.h"
7
8 lock_state::lock_state():
9     held(false)
10 {
11 }
12
13 lock_state::lock_state(const lock_state & other) {
14     *this = other;
15 }
16
17 lock_state & lock_state::operator=(const lock_state & o) {
18     held = o.held;
19     held_by = o.held_by;
20     wanted_by = o.wanted_by;
21     old_requests = o.old_requests;
22     return *this;
23 }
24
25 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
26     lock sl(lock_table_lock);
27     // this will create the lock if it doesn't already exist
28     return lock_table[lid];
29 }
30
31 lock_server::lock_server(rsm & r) : rsm_ (&r) {
32     thread(&lock_server::revoker, this).detach();
33     thread(&lock_server::retryer, this).detach();
34     r.set_state_transfer(this);
35
36     r.reg(lock_protocol::acquire, &lock_server::acquire, this);
37     r.reg(lock_protocol::release, &lock_server::release, this);
38     r.reg(lock_protocol::stat, &lock_server::stat, this);
39 }
40
41 void lock_server::revoker () {
42     while (1) {
43         lock_protocol::lockid_t lid;
44         revoke_fifo.deq(&lid);
45         LOG << "Revoking " << lid;
46         if (rsm_ && !rsm_->amiprimary())
47             continue;
48
49         lock_state & st = get_lock_state(lid);
50         holder_t held_by;
51         {
52             lock sl(st.m);
53             held_by = st.held_by;
54         }
55
56         rpcc *proxy = NULL;
57         // try a few times?
58         //int t=5;
59         //while (t-- && !proxy)
60         proxy = handle(held_by.first).safebind();
61         if (proxy) {
62             int r;
63             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
64             LOG << "Revoke returned " << ret;
65         }
66     }
67 }
68
69 void lock_server::retryer() {
70     while (1) {
71         lock_protocol::lockid_t lid;
72         retry_fifo.deq(&lid);
73         if (rsm_ && !rsm_->amiprimary())
74             continue;
75
76         LOG << "Sending retry for " << lid;
77         lock_state & st = get_lock_state(lid);
78         holder_t front;
79         {
80             lock sl(st.m);
81             if (st.wanted_by.empty())
82                 continue;
83             front = st.wanted_by.front();
84         }
85
86         rpcc *proxy = NULL;
87         // try a few times?
88         //int t=5;
89         //while (t-- && !proxy)
90         proxy = handle(front.first).safebind();
91         if (proxy) {
92             int r;
93             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
94             LOG << "Retry returned " << ret;
95         }
96     }
97 }
98
99 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
100     LOG << "lid=" << lid << " client=" << id << "," << xid;
101     holder_t h = holder_t(id, xid);
102     lock_state & st = get_lock_state(lid);
103     lock sl(st.m);
104
105     // deal with duplicated requests
106     if (st.old_requests.count(id)) {
107         lock_protocol::xid_t old_xid = st.old_requests[id];
108         if (old_xid > xid)
109             return lock_protocol::RPCERR;
110         else if (old_xid == xid) {
111             if (st.held && st.held_by == h) {
112                 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
113                 return lock_protocol::OK;
114             }
115         }
116     }
117
118     // grant the lock if it's available and I'm next in line
119     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
120         if (!st.wanted_by.empty())
121             st.wanted_by.pop_front();
122         st.old_requests[id] = xid;
123
124         st.held = true;
125         st.held_by = h;
126         LOG << "Lock " << lid << " held by " << h.first;
127         if (st.wanted_by.size())
128             revoke_fifo.enq(lid);
129         return lock_protocol::OK;
130     }
131
132     // get in line
133     bool found = false;
134     for (auto p : st.wanted_by) {
135         if (p.first == id) {
136             // make sure client is obeying serialization
137             if (p.second != xid) {
138                 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
139                 return lock_protocol::RPCERR;
140             }
141             found = true;
142             break;
143         }
144     }
145     if (!found)
146         st.wanted_by.push_back(h);
147
148     LOG << "wanted_by=" << st.wanted_by;
149
150     // send revoke if we're first in line
151     if (st.wanted_by.front() == h)
152         revoke_fifo.enq(lid);
153
154     return lock_protocol::RETRY;
155 }
156
157 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
158     LOG << "lid=" << lid << " client=" << id << "," << xid;
159     lock_state & st = get_lock_state(lid);
160     lock sl(st.m);
161     if (st.held && st.held_by == holder_t(id, xid)) {
162         st.held = false;
163         LOG << "Lock " << lid << " not held";
164     }
165     if (st.wanted_by.size())
166         retry_fifo.enq(lid);
167     return lock_protocol::OK;
168 }
169
170 string lock_server::marshal_state() {
171     lock sl(lock_table_lock);
172     return marshall(nacquire, lock_table).content();
173 }
174
175 void lock_server::unmarshal_state(const string & state) {
176     lock sl(lock_table_lock);
177     unmarshall(state, false, nacquire, lock_table);
178 }
179
180 lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
181     LOG << "stat request for " << lid;
182     VERIFY(0);
183     r = nacquire;
184     return lock_protocol::OK;
185 }
186