// the caching lock server implementation
+#include "types.h"
#include "lock_server.h"
-#include <sstream>
-#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
-#include "lang/verify.h"
#include "handle.h"
-#include "tprintf.h"
-#include "rpc/marshall.h"
-#include "lock.h"
-
-using std::ostringstream;
-using std::istringstream;
-using std::vector;
lock_state::lock_state():
held(false)
return *this;
}
-template <class A, class B>
-ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
- o << "<" << d.first << "," << d.second << ">";
- return o;
-}
-
-marshall & operator<<(marshall &m, const lock_state &d) {
- return m << d.held << d.held_by << d.wanted_by;
-}
-
-unmarshall & operator>>(unmarshall &u, lock_state &d) {
- return u >> d.held >> d.held_by >> d.wanted_by;
-}
-
lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
- // by the semantics of map, this will create
- // the lock if it doesn't already exist
+ // this will create the lock if it doesn't already exist
return lock_table[lid];
}
-lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
- std::thread(&lock_server::revoker, this).detach();
- std::thread(&lock_server::retryer, this).detach();
- rsm->set_state_transfer(this);
+lock_server::lock_server(rsm *r) : rsm_ (r) {
+ thread(&lock_server::revoker, this).detach();
+ thread(&lock_server::retryer, this).detach();
+ rsm_->set_state_transfer(this);
}
-void lock_server::revoker() {
+void lock_server::revoker() [[noreturn]] {
while (1) {
lock_protocol::lockid_t lid;
revoke_fifo.deq(&lid);
LOG("Revoking " << lid);
- if (rsm && !rsm->amiprimary())
+ if (rsm_ && !rsm_->amiprimary())
continue;
lock_state &st = get_lock_state(lid);
- holder held_by;
+ holder_t held_by;
{
lock sl(st.m);
held_by = st.held_by;
proxy = handle(held_by.first).safebind();
if (proxy) {
int r;
- rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+ auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
LOG("Revoke returned " << ret);
}
}
}
-void lock_server::retryer() {
+void lock_server::retryer() [[noreturn]] {
while (1) {
lock_protocol::lockid_t lid;
retry_fifo.deq(&lid);
- if (rsm && !rsm->amiprimary())
+ if (rsm_ && !rsm_->amiprimary())
continue;
LOG("Sending retry for " << lid);
lock_state &st = get_lock_state(lid);
- holder front;
+ holder_t front;
{
lock sl(st.m);
if (st.wanted_by.empty())
front = st.wanted_by.front();
}
- rlock_protocol::status ret = -1;
-
rpcc *proxy = NULL;
// try a few times?
//int t=5;
proxy = handle(front.first).safebind();
if (proxy) {
int r;
- ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
+ auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
LOG("Retry returned " << ret);
}
}
}
-int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
- LOG_FUNC_ENTER_SERVER;
- holder h = holder(id, xid);
+int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+ LOG("lid=" << lid << " client=" << id << "," << xid);
+ holder_t h = holder_t(id, xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
// get in line
bool found = false;
- for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
- if (i->first == id) {
+ for (auto p : st.wanted_by) {
+ if (p.first == id) {
// make sure client is obeying serialization
- if (i->second != xid) {
- LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+ if (p.second != xid) {
+ LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
return lock_protocol::RPCERR;
}
found = true;
if (!found)
st.wanted_by.push_back(h);
- LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+ LOG("wanted_by=" << st.wanted_by);
// send revoke if we're first in line
if (st.wanted_by.front() == h)
return lock_protocol::RETRY;
}
-int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
- LOG_FUNC_ENTER_SERVER;
+int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+ LOG("lid=" << lid << " client=" << id << "," << xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
- if (st.held && st.held_by == holder(id, xid)) {
+ if (st.held && st.held_by == holder_t(id, xid)) {
st.held = false;
LOG("Lock " << lid << " not held");
}
string lock_server::marshal_state() {
lock sl(lock_table_lock);
marshall rep;
- rep << nacquire;
- rep << lock_table;
- return rep.str();
+ rep << nacquire << lock_table;
+ return rep.content();
}
-void lock_server::unmarshal_state(string state) {
+void lock_server::unmarshal_state(const string & state) {
lock sl(lock_table_lock);
- unmarshall rep(state);
- rep >> nacquire;
- rep >> lock_table;
+ unmarshall rep(state, false);
+ rep >> nacquire >> lock_table;
}
lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
- printf("stat request\n");
+ LOG("stat request for " << lid);
VERIFY(0);
r = nacquire;
return lock_protocol::OK;