efb23f5a9adf8f1ca9377ae8d805e93d72bdd771
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <unistd.h>
5 #include <arpa/inet.h>
6
7 lock_state::lock_state():
8     held(false)
9 {
10 }
11
12 lock_state::lock_state(const lock_state & other) {
13     *this = other;
14 }
15
16 lock_state & lock_state::operator=(const lock_state & o) {
17     held = o.held;
18     held_by = o.held_by;
19     wanted_by = o.wanted_by;
20     old_requests = o.old_requests;
21     return *this;
22 }
23
24 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
25     lock sl(lock_table_lock);
26     // this will create the lock if it doesn't already exist
27     return lock_table[lid];
28 }
29
30 lock_server::lock_server(rsm & r) : rsm_ (&r) {
31     thread(&lock_server::revoker, this).detach();
32     thread(&lock_server::retryer, this).detach();
33     r.set_state_transfer(this);
34
35     r.reg(lock_protocol::acquire, &lock_server::acquire, this);
36     r.reg(lock_protocol::release, &lock_server::release, this);
37     r.reg(lock_protocol::stat, &lock_server::stat, this);
38 }
39
40 void lock_server::revoker () {
41     while (1) {
42         lock_protocol::lockid_t lid = revoke_fifo.deq();
43         LOG << "Revoking " << lid;
44         if (rsm_ && !rsm_->amiprimary())
45             continue;
46
47         lock_state & st = get_lock_state(lid);
48         lock sl(st.m);
49         holder_t held_by = st.held_by;
50         sl.unlock();
51
52         if (auto cl = rpcc::bind_cached(held_by.first)) {
53             int r;
54             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
55             LOG << "Revoke returned " << ret;
56         }
57     }
58 }
59
60 void lock_server::retryer() {
61     while (1) {
62         lock_protocol::lockid_t lid = retry_fifo.deq();
63         if (rsm_ && !rsm_->amiprimary())
64             continue;
65
66         LOG << "Sending retry for " << lid;
67         lock_state & st = get_lock_state(lid);
68         holder_t front;
69         {
70             lock sl(st.m);
71             if (st.wanted_by.empty())
72                 continue;
73             front = st.wanted_by.front();
74         }
75
76         if (auto cl = rpcc::bind_cached(front.first)) {
77             int r;
78             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
79             LOG << "Retry returned " << ret;
80         }
81     }
82 }
83
84 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
85     LOG << "lid=" << lid << " client=" << id << "," << xid;
86     holder_t h = holder_t(id, xid);
87     lock_state & st = get_lock_state(lid);
88     lock sl(st.m);
89
90     // deal with duplicated requests
91     if (st.old_requests.count(id)) {
92         lock_protocol::xid_t old_xid = st.old_requests[id];
93         if (old_xid > xid)
94             return lock_protocol::RPCERR;
95         else if (old_xid == xid) {
96             if (st.held && st.held_by == h) {
97                 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
98                 return lock_protocol::OK;
99             }
100         }
101     }
102
103     // grant the lock if it's available and I'm next in line
104     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
105         if (!st.wanted_by.empty())
106             st.wanted_by.pop_front();
107         st.old_requests[id] = xid;
108
109         st.held = true;
110         st.held_by = h;
111         LOG << "Lock " << lid << " held by " << h.first;
112         if (st.wanted_by.size())
113             revoke_fifo.enq(lid);
114         return lock_protocol::OK;
115     }
116
117     // get in line
118     bool found = false;
119     for (auto p : st.wanted_by) {
120         if (p.first == id) {
121             // make sure client is obeying serialization
122             if (p.second != xid) {
123                 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
124                 return lock_protocol::RPCERR;
125             }
126             found = true;
127             break;
128         }
129     }
130     if (!found)
131         st.wanted_by.push_back(h);
132
133     LOG << "wanted_by=" << st.wanted_by;
134
135     // send revoke if we're first in line
136     if (st.wanted_by.front() == h)
137         revoke_fifo.enq(lid);
138
139     return lock_protocol::RETRY;
140 }
141
142 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
143     LOG << "lid=" << lid << " client=" << id << "," << xid;
144     lock_state & st = get_lock_state(lid);
145     lock sl(st.m);
146     if (st.held && st.held_by == holder_t(id, xid)) {
147         st.held = false;
148         LOG << "Lock " << lid << " not held";
149     }
150     if (st.wanted_by.size())
151         retry_fifo.enq(lid);
152     return lock_protocol::OK;
153 }
154
155 string lock_server::marshal_state() {
156     lock sl(lock_table_lock);
157     return marshall(nacquire, lock_table).content();
158 }
159
160 void lock_server::unmarshal_state(const string & state) {
161     lock sl(lock_table_lock);
162     unmarshall(state, false, nacquire, lock_table);
163 }
164
165 lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
166     LOG << "stat request for " << lid;
167     VERIFY(0);
168     r = nacquire;
169     return lock_protocol::OK;
170 }
171