// see lock_client.cache.h for protocol details.
#include "lock_client_cache_rsm.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <sstream>
#include <iostream>
#include <algorithm>
#include "tprintf.h"
#include "rsm_client.h"
+#include "lock.h"
+
+using std::ostringstream;
lock_state::lock_state():
state(none)
}
void lock_state::wait() {
- pthread_t self = pthread_self();
- c[self].wait(m);
+ auto self = std::this_thread::get_id();
+ {
+ adopt_lock ml(m);
+ c[self].wait(ml);
+ }
c.erase(self);
}
void lock_state::signal() {
// signal anyone
if (c.begin() != c.end())
- c.begin()->second.signal();
+ c.begin()->second.notify_one();
}
-void lock_state::signal(pthread_t who) {
+void lock_state::signal(std::thread::id who) {
if (c.count(who))
- c[who].signal();
-}
-
-static void * releasethread(void *x) {
- lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
- cc->releaser();
- return 0;
+ c[who].notify_one();
}
int lock_client_cache_rsm::last_port = 0;
lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
- ScopedLock sl(lock_table_lock);
+ lock sl(lock_table_lock);
// by the semantics of std::map, this will create
// the lock if it doesn't already exist
return lock_table[lid];
rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
{
- ScopedLock sl(xid_mutex);
+ lock sl(xid_mutex);
xid = 0;
}
rsmc = new rsm_client(xdst);
- int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
- VERIFY (r == 0);
+ releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this);
}
void lock_client_cache_rsm::releaser() {
LOG("Releaser: " << lid);
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
- VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
+ lock sl(st.m);
+ VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
st.state = lock_state::releasing;
{
- ScopedUnlock su(st.m);
+ sl.unlock();
int r;
rsmc->call(lock_protocol::release, lid, id, st.xid, r);
+ sl.lock();
}
st.state = lock_state::none;
LOG("Lock " << lid << ": none");
lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
- pthread_t self = pthread_self();
+ lock sl(st.m);
+ auto self = std::this_thread::get_id();
// check for reentrancy
VERIFY(st.state != lock_state::locked || st.held_by != self);
if (st.state == lock_state::none || st.state == lock_state::retrying) {
if (st.state == lock_state::none) {
- ScopedLock sl(xid_mutex);
+ lock sl(xid_mutex);
st.xid = xid++;
}
st.state = lock_state::acquiring;
LOG("Lock " << lid << ": acquiring");
lock_protocol::status result;
{
- ScopedUnlock su(st.m);
+ sl.unlock();
int r;
result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
+ sl.lock();
}
LOG("acquire returned " << result);
if (result == lock_protocol::OK) {
VERIFY(st.wanted_by.size() != 0);
if (st.state == lock_state::free) {
// is it for me?
- pthread_t front = st.wanted_by.front();
- if (front == releaser_thread) {
+ auto front = st.wanted_by.front();
+ if (front == releaser_thread.get_id()) {
st.wanted_by.pop_front();
st.state = lock_state::locked;
- st.held_by = releaser_thread;
+ st.held_by = releaser_thread.get_id();
LOG("Queuing " << lid << " for release");
release_fifo.enq(lid);
} else if (front == self) {
lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
- pthread_t self = pthread_self();
+ lock sl(st.m);
+ auto self = std::this_thread::get_id();
VERIFY(st.state == lock_state::locked && st.held_by == self);
st.state = lock_state::free;
LOG("Lock " << lid << ": free");
if (st.wanted_by.size()) {
- pthread_t front = st.wanted_by.front();
- if (front == releaser_thread) {
+ auto front = st.wanted_by.front();
+ if (front == releaser_thread.get_id()) {
st.state = lock_state::locked;
- st.held_by = releaser_thread;
+ st.held_by = releaser_thread.get_id();
st.wanted_by.pop_front();
LOG("Queuing " << lid << " for release");
release_fifo.enq(lid);
rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
LOG("Revoke handler " << lid << " " << xid);
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
+ lock sl(st.m);
if (st.state == lock_state::releasing || st.state == lock_state::none)
return rlock_protocol::OK;
if (st.state == lock_state::free &&
- (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
+ (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
// gimme
st.state = lock_state::locked;
- st.held_by = releaser_thread;
+ st.held_by = releaser_thread.get_id();
if (st.wanted_by.size())
st.wanted_by.pop_front();
release_fifo.enq(lid);
} else {
// get in line
- st.wanted_by.push_back(releaser_thread);
+ st.wanted_by.push_back(releaser_thread.get_id());
}
return rlock_protocol::OK;
}
rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
+ lock sl(st.m);
VERIFY(st.state == lock_state::acquiring);
st.state = lock_state::retrying;
LOG("Lock " << lid << ": none");