141a5988e45302bbd4bd1d3b5ad736fc9d87a906
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <unistd.h>
5 #include <arpa/inet.h>
6
7 lock_state::lock_state():
8     held(false)
9 {
10 }
11
12 lock_state::lock_state(const lock_state & other) {
13     *this = other;
14 }
15
16 lock_state & lock_state::operator=(const lock_state & o) {
17     held = o.held;
18     held_by = o.held_by;
19     wanted_by = o.wanted_by;
20     old_requests = o.old_requests;
21     return *this;
22 }
23
24 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
25     lock sl(lock_table_lock);
26     // this will create the lock if it doesn't already exist
27     return lock_table[lid];
28 }
29
30 lock_server::lock_server(rsm & r) : rsm_ (&r) {
31     thread(&lock_server::revoker, this).detach();
32     thread(&lock_server::retryer, this).detach();
33     r.set_state_transfer(this);
34
35     r.reg(lock_protocol::acquire, &lock_server::acquire, this);
36     r.reg(lock_protocol::release, &lock_server::release, this);
37     r.reg(lock_protocol::stat, &lock_server::stat, this);
38 }
39
40 void lock_server::revoker () {
41     while (1) {
42         lock_protocol::lockid_t lid;
43         revoke_fifo.deq(&lid);
44         LOG << "Revoking " << lid;
45         if (rsm_ && !rsm_->amiprimary())
46             continue;
47
48         lock_state & st = get_lock_state(lid);
49         lock sl(st.m);
50         holder_t held_by = st.held_by;
51         sl.unlock();
52
53         if (auto cl = rpcc::bind_cached(held_by.first)) {
54             int r;
55             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
56             LOG << "Revoke returned " << ret;
57         }
58     }
59 }
60
61 void lock_server::retryer() {
62     while (1) {
63         lock_protocol::lockid_t lid;
64         retry_fifo.deq(&lid);
65         if (rsm_ && !rsm_->amiprimary())
66             continue;
67
68         LOG << "Sending retry for " << lid;
69         lock_state & st = get_lock_state(lid);
70         holder_t front;
71         {
72             lock sl(st.m);
73             if (st.wanted_by.empty())
74                 continue;
75             front = st.wanted_by.front();
76         }
77
78         if (auto cl = rpcc::bind_cached(front.first)) {
79             int r;
80             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
81             LOG << "Retry returned " << ret;
82         }
83     }
84 }
85
86 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
87     LOG << "lid=" << lid << " client=" << id << "," << xid;
88     holder_t h = holder_t(id, xid);
89     lock_state & st = get_lock_state(lid);
90     lock sl(st.m);
91
92     // deal with duplicated requests
93     if (st.old_requests.count(id)) {
94         lock_protocol::xid_t old_xid = st.old_requests[id];
95         if (old_xid > xid)
96             return lock_protocol::RPCERR;
97         else if (old_xid == xid) {
98             if (st.held && st.held_by == h) {
99                 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
100                 return lock_protocol::OK;
101             }
102         }
103     }
104
105     // grant the lock if it's available and I'm next in line
106     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
107         if (!st.wanted_by.empty())
108             st.wanted_by.pop_front();
109         st.old_requests[id] = xid;
110
111         st.held = true;
112         st.held_by = h;
113         LOG << "Lock " << lid << " held by " << h.first;
114         if (st.wanted_by.size())
115             revoke_fifo.enq(lid);
116         return lock_protocol::OK;
117     }
118
119     // get in line
120     bool found = false;
121     for (auto p : st.wanted_by) {
122         if (p.first == id) {
123             // make sure client is obeying serialization
124             if (p.second != xid) {
125                 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
126                 return lock_protocol::RPCERR;
127             }
128             found = true;
129             break;
130         }
131     }
132     if (!found)
133         st.wanted_by.push_back(h);
134
135     LOG << "wanted_by=" << st.wanted_by;
136
137     // send revoke if we're first in line
138     if (st.wanted_by.front() == h)
139         revoke_fifo.enq(lid);
140
141     return lock_protocol::RETRY;
142 }
143
144 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
145     LOG << "lid=" << lid << " client=" << id << "," << xid;
146     lock_state & st = get_lock_state(lid);
147     lock sl(st.m);
148     if (st.held && st.held_by == holder_t(id, xid)) {
149         st.held = false;
150         LOG << "Lock " << lid << " not held";
151     }
152     if (st.wanted_by.size())
153         retry_fifo.enq(lid);
154     return lock_protocol::OK;
155 }
156
157 string lock_server::marshal_state() {
158     lock sl(lock_table_lock);
159     return marshall(nacquire, lock_table).content();
160 }
161
162 void lock_server::unmarshal_state(const string & state) {
163     lock sl(lock_table_lock);
164     unmarshall(state, false, nacquire, lock_table);
165 }
166
167 lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
168     LOG << "stat request for " << lid;
169     VERIFY(0);
170     r = nacquire;
171     return lock_protocol::OK;
172 }
173