So many changes. Broken.
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "include/lock_server.h"
4 #include <unistd.h>
5 #include <arpa/inet.h>
6
7 lock_server::lock_state::lock_state(const lock_state & other) {
8     held = other.held;
9     held_by = other.held_by;
10     wanted_by = other.wanted_by;
11     old_requests = other.old_requests;
12 }
13
14 lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
15     lock sl(lock_table_lock);
16     // this will create the lock if it doesn't already exist
17     return lock_table[lid];
18 }
19
20 lock_server::lock_server(rsm & r) : rsm_ (&r) {
21     thread(&lock_server::revoker, this).detach();
22     thread(&lock_server::retryer, this).detach();
23     r.set_state_transfer(this);
24
25     r.reg(lock_protocol::acquire, &lock_server::acquire, this);
26     r.reg(lock_protocol::release, &lock_server::release, this);
27 }
28
29 void lock_server::revoker () {
30     while (1) {
31         lock_protocol::lockid_t lid = revoke_fifo.deq();
32         LOG << "Revoking " << lid;
33         if (rsm_ && !rsm_->amiprimary())
34             continue;
35
36         lock_state & st = get_lock_state(lid);
37         lock sl(st.m);
38         holder_t held_by = st.held_by;
39         sl.unlock();
40
41         if (auto cl = rpcc::bind_cached(held_by.first)) {
42             int r;
43             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
44             LOG << "Revoke returned " << ret;
45         }
46     }
47 }
48
49 void lock_server::retryer() {
50     while (1) {
51         lock_protocol::lockid_t lid = retry_fifo.deq();
52         if (rsm_ && !rsm_->amiprimary())
53             continue;
54
55         LOG << "Sending retry for " << lid;
56         lock_state & st = get_lock_state(lid);
57         holder_t front;
58         {
59             lock sl(st.m);
60             if (st.wanted_by.empty())
61                 continue;
62             front = st.wanted_by.front();
63         }
64
65         if (auto cl = rpcc::bind_cached(front.first)) {
66             int r;
67             auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
68             LOG << "Retry returned " << ret;
69         }
70     }
71 }
72
73 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
74     LOG << "lid=" << lid << " client=" << id << "," << xid;
75     holder_t h = holder_t(id, xid);
76     lock_state & st = get_lock_state(lid);
77     lock sl(st.m);
78
79     // deal with duplicated requests
80     if (st.old_requests.count(id)) {
81         lock_protocol::xid_t old_xid = st.old_requests[id];
82         if (old_xid > xid)
83             return lock_protocol::RPCERR;
84         else if (old_xid == xid) {
85             if (st.held && st.held_by == h) {
86                 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
87                 return lock_protocol::OK;
88             }
89         }
90     }
91
92     // grant the lock if it's available and I'm next in line
93     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
94         if (!st.wanted_by.empty())
95             st.wanted_by.pop_front();
96         st.old_requests[id] = xid;
97
98         st.held = true;
99         st.held_by = h;
100         LOG << "Lock " << lid << " held by " << h.first;
101         if (st.wanted_by.size())
102             revoke_fifo.enq(lid);
103         return lock_protocol::OK;
104     }
105
106     // get in line
107     bool found = false;
108     for (auto p : st.wanted_by) {
109         if (p.first == id) {
110             // make sure client is obeying serialization
111             if (p.second != xid) {
112                 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
113                 return lock_protocol::RPCERR;
114             }
115             found = true;
116             break;
117         }
118     }
119     if (!found)
120         st.wanted_by.push_back(h);
121
122     LOG << "wanted_by=" << st.wanted_by;
123
124     // send revoke if we're first in line
125     if (st.wanted_by.front() == h)
126         revoke_fifo.enq(lid);
127
128     return lock_protocol::RETRY;
129 }
130
131 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
132     LOG << "lid=" << lid << " client=" << id << "," << xid;
133     lock_state & st = get_lock_state(lid);
134     lock sl(st.m);
135     if (st.held && st.held_by == holder_t(id, xid)) {
136         st.held = false;
137         LOG << "Lock " << lid << " not held";
138     }
139     if (st.wanted_by.size())
140         retry_fifo.enq(lid);
141     return lock_protocol::OK;
142 }
143
144 string lock_server::marshall_state() {
145     lock sl(lock_table_lock);
146     return marshall(nacquire, lock_table);
147 }
148
149 void lock_server::unmarshall_state(const string & state) {
150     lock sl(lock_table_lock);
151     unmarshall(state, nacquire, lock_table);
152 }