86e5ad20df5c52e9144b9308bf88c5390ed4cd78
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <unistd.h>
5 #include <arpa/inet.h>
6
7 lock_state::lock_state():
8     held(false)
9 {
10 }
11
12 lock_state::lock_state(const lock_state & other) {
13     *this = other;
14 }
15
16 lock_state & lock_state::operator=(const lock_state & o) {
17     held = o.held;
18     held_by = o.held_by;
19     wanted_by = o.wanted_by;
20     old_requests = o.old_requests;
21     return *this;
22 }
23
24 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
25     lock sl(lock_table_lock);
26     // this will create the lock if it doesn't already exist
27     return lock_table[lid];
28 }
29
30 lock_server::lock_server(rsm & r) : rsm_ (&r) {
31     thread(&lock_server::revoker, this).detach();
32     thread(&lock_server::retryer, this).detach();
33     r.set_state_transfer(this);
34
35     r.reg(lock_protocol::acquire, &lock_server::acquire, this);
36     r.reg(lock_protocol::release, &lock_server::release, this);
37 }
38
39 void lock_server::revoker () {
40     while (1) {
41         lock_protocol::lockid_t lid = revoke_fifo.deq();
42         LOG << "Revoking " << lid;
43         if (rsm_ && !rsm_->amiprimary())
44             continue;
45
46         lock_state & st = get_lock_state(lid);
47         lock sl(st.m);
48         holder_t held_by = st.held_by;
49         sl.unlock();
50
51         if (auto cl = rpcc::bind_cached(held_by.first)) {
52             int r;
53             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
54             LOG << "Revoke returned " << ret;
55         }
56     }
57 }
58
59 void lock_server::retryer() {
60     while (1) {
61         lock_protocol::lockid_t lid = retry_fifo.deq();
62         if (rsm_ && !rsm_->amiprimary())
63             continue;
64
65         LOG << "Sending retry for " << lid;
66         lock_state & st = get_lock_state(lid);
67         holder_t front;
68         {
69             lock sl(st.m);
70             if (st.wanted_by.empty())
71                 continue;
72             front = st.wanted_by.front();
73         }
74
75         if (auto cl = rpcc::bind_cached(front.first)) {
76             int r;
77             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
78             LOG << "Retry returned " << ret;
79         }
80     }
81 }
82
83 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
84     LOG << "lid=" << lid << " client=" << id << "," << xid;
85     holder_t h = holder_t(id, xid);
86     lock_state & st = get_lock_state(lid);
87     lock sl(st.m);
88
89     // deal with duplicated requests
90     if (st.old_requests.count(id)) {
91         lock_protocol::xid_t old_xid = st.old_requests[id];
92         if (old_xid > xid)
93             return lock_protocol::RPCERR;
94         else if (old_xid == xid) {
95             if (st.held && st.held_by == h) {
96                 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
97                 return lock_protocol::OK;
98             }
99         }
100     }
101
102     // grant the lock if it's available and I'm next in line
103     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
104         if (!st.wanted_by.empty())
105             st.wanted_by.pop_front();
106         st.old_requests[id] = xid;
107
108         st.held = true;
109         st.held_by = h;
110         LOG << "Lock " << lid << " held by " << h.first;
111         if (st.wanted_by.size())
112             revoke_fifo.enq(lid);
113         return lock_protocol::OK;
114     }
115
116     // get in line
117     bool found = false;
118     for (auto p : st.wanted_by) {
119         if (p.first == id) {
120             // make sure client is obeying serialization
121             if (p.second != xid) {
122                 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
123                 return lock_protocol::RPCERR;
124             }
125             found = true;
126             break;
127         }
128     }
129     if (!found)
130         st.wanted_by.push_back(h);
131
132     LOG << "wanted_by=" << st.wanted_by;
133
134     // send revoke if we're first in line
135     if (st.wanted_by.front() == h)
136         revoke_fifo.enq(lid);
137
138     return lock_protocol::RETRY;
139 }
140
141 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
142     LOG << "lid=" << lid << " client=" << id << "," << xid;
143     lock_state & st = get_lock_state(lid);
144     lock sl(st.m);
145     if (st.held && st.held_by == holder_t(id, xid)) {
146         st.held = false;
147         LOG << "Lock " << lid << " not held";
148     }
149     if (st.wanted_by.size())
150         retry_fifo.enq(lid);
151     return lock_protocol::OK;
152 }
153
154 string lock_server::marshal_state() {
155     lock sl(lock_table_lock);
156     return marshall(nacquire, lock_table).content();
157 }
158
159 void lock_server::unmarshal_state(const string & state) {
160     lock sl(lock_table_lock);
161     unmarshall(state, false, nacquire, lock_table);
162 }