More dependency check-ups
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "types.h"
4 #include "lock_server.h"
5 #include <unistd.h>
6 #include <arpa/inet.h>
7 #include "handle.h"
8
9 lock_state::lock_state():
10     held(false)
11 {
12 }
13
14 lock_state::lock_state(const lock_state &other) {
15     *this = other;
16 }
17
18 lock_state& lock_state::operator=(const lock_state& o) {
19     held = o.held;
20     held_by = o.held_by;
21     wanted_by = o.wanted_by;
22     old_requests = o.old_requests;
23     return *this;
24 }
25
26 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
27     lock sl(lock_table_lock);
28     // this will create the lock if it doesn't already exist
29     return lock_table[lid];
30 }
31
32 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
33     thread(&lock_server::revoker, this).detach();
34     thread(&lock_server::retryer, this).detach();
35     rsm->set_state_transfer(this);
36 }
37
38 void lock_server::revoker() [[noreturn]] {
39     while (1) {
40         lock_protocol::lockid_t lid;
41         revoke_fifo.deq(&lid);
42         LOG("Revoking " << lid);
43         if (rsm && !rsm->amiprimary())
44             continue;
45
46         lock_state &st = get_lock_state(lid);
47         holder_t held_by;
48         {
49             lock sl(st.m);
50             held_by = st.held_by;
51         }
52
53         rpcc *proxy = NULL;
54         // try a few times?
55         //int t=5;
56         //while (t-- && !proxy)
57         proxy = handle(held_by.first).safebind();
58         if (proxy) {
59             int r;
60             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
61             LOG("Revoke returned " << ret);
62         }
63     }
64 }
65
66 void lock_server::retryer() [[noreturn]] {
67     while (1) {
68         lock_protocol::lockid_t lid;
69         retry_fifo.deq(&lid);
70         if (rsm && !rsm->amiprimary())
71             continue;
72
73         LOG("Sending retry for " << lid);
74         lock_state &st = get_lock_state(lid);
75         holder_t front;
76         {
77             lock sl(st.m);
78             if (st.wanted_by.empty())
79                 continue;
80             front = st.wanted_by.front();
81         }
82
83         rpcc *proxy = NULL;
84         // try a few times?
85         //int t=5;
86         //while (t-- && !proxy)
87         proxy = handle(front.first).safebind();
88         if (proxy) {
89             int r;
90             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
91             LOG("Retry returned " << ret);
92         }
93     }
94 }
95
96 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
97     LOG("lid=" << lid << " client=" << id << "," << xid);
98     holder_t h = holder_t(id, xid);
99     lock_state &st = get_lock_state(lid);
100     lock sl(st.m);
101
102     // deal with duplicated requests
103     if (st.old_requests.count(id)) {
104         lock_protocol::xid_t old_xid = st.old_requests[id];
105         if (old_xid > xid)
106             return lock_protocol::RPCERR;
107         else if (old_xid == xid) {
108             if (st.held && st.held_by == h) {
109                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
110                 return lock_protocol::OK;
111             }
112         }
113     }
114
115     // grant the lock if it's available and I'm next in line
116     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
117         if (!st.wanted_by.empty())
118             st.wanted_by.pop_front();
119         st.old_requests[id] = xid;
120
121         st.held = true;
122         st.held_by = h;
123         LOG("Lock " << lid << " held by " << h.first);
124         if (st.wanted_by.size())
125             revoke_fifo.enq(lid);
126         return lock_protocol::OK;
127     }
128
129     // get in line
130     bool found = false;
131     for (auto p : st.wanted_by) {
132         if (p.first == id) {
133             // make sure client is obeying serialization
134             if (p.second != xid) {
135                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
136                 return lock_protocol::RPCERR;
137             }
138             found = true;
139             break;
140         }
141     }
142     if (!found)
143         st.wanted_by.push_back(h);
144
145     LOG("wanted_by=" << st.wanted_by);
146
147     // send revoke if we're first in line
148     if (st.wanted_by.front() == h)
149         revoke_fifo.enq(lid);
150
151     return lock_protocol::RETRY;
152 }
153
154 int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
155     LOG("lid=" << lid << " client=" << id << "," << xid);
156     lock_state &st = get_lock_state(lid);
157     lock sl(st.m);
158     if (st.held && st.held_by == holder_t(id, xid)) {
159         st.held = false;
160         LOG("Lock " << lid << " not held");
161     }
162     if (st.wanted_by.size())
163         retry_fifo.enq(lid);
164     return lock_protocol::OK;
165 }
166
167 string lock_server::marshal_state() {
168     lock sl(lock_table_lock);
169     marshall rep;
170     rep << nacquire << lock_table;
171     return rep.content();
172 }
173
174 void lock_server::unmarshal_state(string state) {
175     lock sl(lock_table_lock);
176     unmarshall rep(state, false);
177     rep >> nacquire >> lock_table;
178 }
179
180 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
181     LOG("stat request for " << lid);
182     VERIFY(0);
183     r = nacquire;
184     return lock_protocol::OK;
185 }
186