X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5987a5357ccfd818d540a7aa58eb498a1be07aa1..26ade07ab0e62b98b452fbbd18edba0450035e35:/lock_client.cc diff --git a/lock_client.cc b/lock_client.cc index d996b40..7a44940 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -1,30 +1,11 @@ // RPC stubs for clients to talk to lock_server, and cache the locks. #include "lock_client.h" -#include "rpc/rpc.h" -#include -#include -#include -#include -#include "tprintf.h" #include -#include "rsm_client.h" -#include "lock.h" - -using std::ostringstream; - -lock_state::lock_state(): - state(none) -{ -} - -void lock_state::wait() { - auto self = std::this_thread::get_id(); - { - adopt_lock ml(m); - c[self].wait(ml); - } +void lock_state::wait(lock & mutex_lock) { + auto self = this_thread::get_id(); + c[self].wait(mutex_lock); c.erase(self); } @@ -34,55 +15,44 @@ void lock_state::signal() { c.begin()->second.notify_one(); } -void lock_state::signal(std::thread::id who) { +void lock_state::signal(thread::id who) { if (c.count(who)) c[who].notify_one(); } -int lock_client::last_port = 0; +typedef map lock_map; + +in_port_t lock_client::last_port = 0; lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); - // by the semantics of std::map, this will create - // the lock if it doesn't already exist - return lock_table[lid]; + return lock_table[lid]; // creates the lock if it doesn't already exist } -lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) { - sockaddr_in dstsock; - make_sockaddr(xdst.c_str(), &dstsock); - cl = new rpcc(dstsock); - if (cl->bind() < 0) { - printf("lock_client: call bind\n"); - } +lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) { + cl = unique_ptr(new rpcc(xdst)); + if (cl->bind() < 0) + LOG << "lock_client: call bind"; - srand(time(NULL)^last_port); - rlock_port = ((rand()%32000) | (0x1 << 10)); - const char *hname; - // VERIFY(gethostname(hname, 100) == 0); - hname = "127.0.0.1"; - ostringstream host; - host << hname << ":" << rlock_port; - id = host.str(); + srandom((uint32_t)time(NULL)^last_port); + rlock_port = ((random()%32000) | (0x1 << 10)); + id = "127.0.0.1:" + to_string(rlock_port); last_port = rlock_port; - rpcs *rlsrpc = new rpcs(rlock_port); + rlsrpc = unique_ptr(new rpcs(rlock_port)); rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this); rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this); - { - lock sl(xid_mutex); - xid = 0; - } - rsmc = new rsm_client(xdst); - releaser_thread = std::thread(&lock_client::releaser, this); + rsmc = unique_ptr(new rsm_client(xdst)); + releaser_thread = thread(&lock_client::releaser, this); + rlsrpc->start(); } void lock_client::releaser() { while (1) { lock_protocol::lockid_t lid; release_fifo.deq(&lid); - LOG("Releaser: " << lid); + LOG << "Releaser: " << lid; - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id()); st.state = lock_state::releasing; @@ -90,10 +60,12 @@ void lock_client::releaser() { sl.unlock(); int r; rsmc->call(lock_protocol::release, r, lid, id, st.xid); + if (lu) + lu->dorelease(lid); sl.lock(); } st.state = lock_state::none; - LOG("Lock " << lid << ": none"); + LOG << "Lock " << lid << ": none"; st.signal(); } } @@ -101,44 +73,45 @@ void lock_client::releaser() { int lock_client::stat(lock_protocol::lockid_t lid) { VERIFY(0); int r; - lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid); + auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id); VERIFY (ret == lock_protocol::OK); return r; } lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); - auto self = std::this_thread::get_id(); + auto self = this_thread::get_id(); // check for reentrancy VERIFY(st.state != lock_state::locked || st.held_by != self); - VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end()); + VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self) + == st.wanted_by.end()); st.wanted_by.push_back(self); while (1) { if (st.state != lock_state::free) - LOG("Lock " << lid << ": not free"); + LOG << "Lock " << lid << ": not free"; if (st.state == lock_state::none || st.state == lock_state::retrying) { if (st.state == lock_state::none) { - lock sl(xid_mutex); - st.xid = xid++; + lock l(xid_mutex); + st.xid = next_xid++; } st.state = lock_state::acquiring; - LOG("Lock " << lid << ": acquiring"); + LOG << "Lock " << lid << ": acquiring"; lock_protocol::status result; { sl.unlock(); int r; - result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); + result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); sl.lock(); } - LOG("acquire returned " << result); + LOG << "acquire returned " << result; if (result == lock_protocol::OK) { st.state = lock_state::free; - LOG("Lock " << lid << ": free"); + LOG << "Lock " << lid << ": free"; } } @@ -150,7 +123,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { st.wanted_by.pop_front(); st.state = lock_state::locked; st.held_by = releaser_thread.get_id(); - LOG("Queuing " << lid << " for release"); + LOG << "Queuing " << lid << " for release"; release_fifo.enq(lid); } else if (front == self) { st.wanted_by.pop_front(); @@ -162,40 +135,40 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { } } - LOG("waiting..."); - st.wait(); - LOG("wait ended"); + LOG << "waiting..."; + st.wait(sl); + LOG << "wait ended"; } - LOG("Lock " << lid << ": locked"); + LOG << "Lock " << lid << ": locked"; return lock_protocol::OK; } lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); - auto self = std::this_thread::get_id(); + auto self = this_thread::get_id(); VERIFY(st.state == lock_state::locked && st.held_by == self); st.state = lock_state::free; - LOG("Lock " << lid << ": free"); + LOG << "Lock " << lid << ": free"; if (st.wanted_by.size()) { auto front = st.wanted_by.front(); if (front == releaser_thread.get_id()) { st.state = lock_state::locked; st.held_by = releaser_thread.get_id(); st.wanted_by.pop_front(); - LOG("Queuing " << lid << " for release"); + LOG << "Queuing " << lid << " for release"; release_fifo.enq(lid); } else st.signal(front); } - LOG("Finished signaling."); + LOG << "Finished signaling."; return lock_protocol::OK; } rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { - LOG("Revoke handler " << lid << " " << xid); - lock_state &st = get_lock_state(lid); + LOG << "Revoke handler " << lid << " " << xid; + lock_state & st = get_lock_state(lid); lock sl(st.m); if (st.state == lock_state::releasing || st.state == lock_state::none) @@ -216,12 +189,12 @@ rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_ return rlock_protocol::OK; } -rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { - lock_state &st = get_lock_state(lid); +rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) { + lock_state & st = get_lock_state(lid); lock sl(st.m); VERIFY(st.state == lock_state::acquiring); st.state = lock_state::retrying; - LOG("Lock " << lid << ": none"); + LOG << "Lock " << lid << ": none"; st.signal(); // only one thread needs to wake up return rlock_protocol::OK; } @@ -239,7 +212,7 @@ t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) { } t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) { - return ((lock_client *)client)->acquire(lid); + return ((lock_client *)client)->release(lid); } t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {