lock_demo=lock_demo.o lock_client.o tprintf.o rsm_client.o handle.o
lock_demo : $(lock_demo) rpc/librpc.a
-lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
+lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o
lock_tester : $(lock_tester) rpc/librpc.a
-lock_server=lock_server.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server=lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server.o
lock_server : $(lock_server) rpc/librpc.a
rsm_tester=rsm_tester.o rsmtest_client.o tprintf.o
-// RPC stubs for clients to talk to lock_server
+// RPC stubs for clients to talk to lock_server, and cache the locks.
#include "lock_client.h"
#include "rpc/rpc.h"
-#include <arpa/inet.h>
-
#include <sstream>
#include <iostream>
+#include <algorithm>
#include <stdio.h>
+#include "tprintf.h"
+#include <arpa/inet.h>
-lock_client::lock_client(std::string dst)
+#include "rsm_client.h"
+#include "lock.h"
+
+using std::ostringstream;
+
+lock_state::lock_state():
+ state(none)
{
+}
+
+void lock_state::wait() {
+ auto self = std::this_thread::get_id();
+ {
+ adopt_lock ml(m);
+ c[self].wait(ml);
+ }
+ c.erase(self);
+}
+
+void lock_state::signal() {
+ // signal anyone
+ if (c.begin() != c.end())
+ c.begin()->second.notify_one();
+}
+
+void lock_state::signal(std::thread::id who) {
+ if (c.count(who))
+ c[who].notify_one();
+}
+
+int lock_client::last_port = 0;
+
+lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
+ lock sl(lock_table_lock);
+ // by the semantics of std::map, this will create
+ // the lock if it doesn't already exist
+ return lock_table[lid];
+}
+
+lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) {
sockaddr_in dstsock;
- make_sockaddr(dst.c_str(), &dstsock);
+ make_sockaddr(xdst.c_str(), &dstsock);
cl = new rpcc(dstsock);
if (cl->bind() < 0) {
printf("lock_client: call bind\n");
}
+
+ srand(time(NULL)^last_port);
+ rlock_port = ((rand()%32000) | (0x1 << 10));
+ const char *hname;
+ // VERIFY(gethostname(hname, 100) == 0);
+ hname = "127.0.0.1";
+ ostringstream host;
+ host << hname << ":" << rlock_port;
+ id = host.str();
+ last_port = rlock_port;
+ rpcs *rlsrpc = new rpcs(rlock_port);
+ rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
+ rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
+ {
+ lock sl(xid_mutex);
+ xid = 0;
+ }
+ rsmc = new rsm_client(xdst);
+ releaser_thread = std::thread(&lock_client::releaser, this);
}
-int
-lock_client::stat(lock_protocol::lockid_t lid)
-{
+void lock_client::releaser() {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ release_fifo.deq(&lid);
+ LOG("Releaser: " << lid);
+
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
+ st.state = lock_state::releasing;
+ {
+ sl.unlock();
+ int r;
+ rsmc->call(lock_protocol::release, r, lid, id, st.xid);
+ sl.lock();
+ }
+ st.state = lock_state::none;
+ LOG("Lock " << lid << ": none");
+ st.signal();
+ }
+}
+
+int lock_client::stat(lock_protocol::lockid_t lid) {
+ VERIFY(0);
int r;
lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid);
VERIFY (ret == lock_protocol::OK);
return r;
}
-lock_protocol::status
-lock_client::acquire(lock_protocol::lockid_t lid)
-{
- int r;
- return cl->call(lock_protocol::acquire, r, cl->id(), lid);
+lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ auto self = std::this_thread::get_id();
+
+ // check for reentrancy
+ VERIFY(st.state != lock_state::locked || st.held_by != self);
+ VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
+
+ st.wanted_by.push_back(self);
+
+ while (1) {
+ if (st.state != lock_state::free)
+ LOG("Lock " << lid << ": not free");
+
+ if (st.state == lock_state::none || st.state == lock_state::retrying) {
+ if (st.state == lock_state::none) {
+ lock sl(xid_mutex);
+ st.xid = xid++;
+ }
+ st.state = lock_state::acquiring;
+ LOG("Lock " << lid << ": acquiring");
+ lock_protocol::status result;
+ {
+ sl.unlock();
+ int r;
+ result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
+ sl.lock();
+ }
+ LOG("acquire returned " << result);
+ if (result == lock_protocol::OK) {
+ st.state = lock_state::free;
+ LOG("Lock " << lid << ": free");
+ }
+ }
+
+ VERIFY(st.wanted_by.size() != 0);
+ if (st.state == lock_state::free) {
+ // is it for me?
+ auto front = st.wanted_by.front();
+ if (front == releaser_thread.get_id()) {
+ st.wanted_by.pop_front();
+ st.state = lock_state::locked;
+ st.held_by = releaser_thread.get_id();
+ LOG("Queuing " << lid << " for release");
+ release_fifo.enq(lid);
+ } else if (front == self) {
+ st.wanted_by.pop_front();
+ st.state = lock_state::locked;
+ st.held_by = self;
+ break;
+ } else {
+ st.signal(front);
+ }
+ }
+
+ LOG("waiting...");
+ st.wait();
+ LOG("wait ended");
+ }
+
+ LOG("Lock " << lid << ": locked");
+ return lock_protocol::OK;
}
-lock_protocol::status
-lock_client::release(lock_protocol::lockid_t lid)
-{
- int r;
- return cl->call(lock_protocol::release, r, cl->id(), lid);
+lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ auto self = std::this_thread::get_id();
+ VERIFY(st.state == lock_state::locked && st.held_by == self);
+ st.state = lock_state::free;
+ LOG("Lock " << lid << ": free");
+ if (st.wanted_by.size()) {
+ auto front = st.wanted_by.front();
+ if (front == releaser_thread.get_id()) {
+ st.state = lock_state::locked;
+ st.held_by = releaser_thread.get_id();
+ st.wanted_by.pop_front();
+ LOG("Queuing " << lid << " for release");
+ release_fifo.enq(lid);
+ } else
+ st.signal(front);
+ }
+ LOG("Finished signaling.");
+ return lock_protocol::OK;
+}
+
+rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
+ LOG("Revoke handler " << lid << " " << xid);
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+
+ if (st.state == lock_state::releasing || st.state == lock_state::none)
+ return rlock_protocol::OK;
+
+ if (st.state == lock_state::free &&
+ (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
+ // gimme
+ st.state = lock_state::locked;
+ st.held_by = releaser_thread.get_id();
+ if (st.wanted_by.size())
+ st.wanted_by.pop_front();
+ release_fifo.enq(lid);
+ } else {
+ // get in line
+ st.wanted_by.push_back(releaser_thread.get_id());
+ }
+ return rlock_protocol::OK;
+}
+
+rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ VERIFY(st.state == lock_state::acquiring);
+ st.state = lock_state::retrying;
+ LOG("Lock " << lid << ": none");
+ st.signal(); // only one thread needs to wake up
+ return rlock_protocol::OK;
}
t4_lock_client *t4_lock_client_new(const char *dst) {
t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
return ((lock_client *)client)->stat(lid);
}
-
// lock client interface.
#ifndef lock_client_h
+
#define lock_client_h
#ifdef __cplusplus
#include <string>
#include "lock_protocol.h"
#include "rpc/rpc.h"
-#include <vector>
+#include "lang/verify.h"
+#include "rpc/fifo.h"
+#include "rsm_client.h"
+
+class lock_release_user {
+ public:
+ virtual void dorelease(lock_protocol::lockid_t) = 0;
+ virtual ~lock_release_user() {};
+};
+
+using std::string;
+using std::thread;
+using std::list;
+using std::map;
+
+typedef string callback;
-// Client interface to the lock server
+class lock_state {
+public:
+ lock_state();
+ enum {
+ none = 0,
+ retrying,
+ free,
+ locked,
+ acquiring,
+ releasing
+ } state;
+ std::thread::id held_by;
+ list<std::thread::id> wanted_by;
+ mutex m;
+ map<std::thread::id, std::condition_variable> c;
+ lock_protocol::xid_t xid;
+ void wait();
+ void signal();
+ void signal(std::thread::id who);
+};
+
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
+// Clients that caches locks. The server can revoke locks using
+// lock_revoke_server.
class lock_client {
- protected:
- rpcc *cl;
- public:
- lock_client(std::string d);
- virtual ~lock_client() {};
- virtual lock_protocol::status acquire(lock_protocol::lockid_t);
- virtual lock_protocol::status release(lock_protocol::lockid_t);
- virtual lock_protocol::status stat(lock_protocol::lockid_t);
+ private:
+ rpcc *cl;
+ std::thread releaser_thread;
+ rsm_client *rsmc;
+ class lock_release_user *lu;
+ int rlock_port;
+ string hostname;
+ string id;
+ mutex xid_mutex;
+ lock_protocol::xid_t xid;
+ fifo<lock_protocol::lockid_t> release_fifo;
+ mutex lock_table_lock;
+ lock_map lock_table;
+ lock_state &get_lock_state(lock_protocol::lockid_t lid);
+ public:
+ static int last_port;
+ lock_client(string xdst, class lock_release_user *l = 0);
+ ~lock_client() {};
+ lock_protocol::status acquire(lock_protocol::lockid_t);
+ lock_protocol::status release(lock_protocol::lockid_t);
+ int stat(lock_protocol::lockid_t);
+ void releaser();
+ rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
+ rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
};
-#endif
+#endif // C++
extern "C" {
+++ /dev/null
-// RPC stubs for clients to talk to lock_server, and cache the locks
-// see lock_client.cache.h for protocol details.
-
-#include "lock_client_cache_rsm.h"
-#include "rpc/rpc.h"
-#include <sstream>
-#include <iostream>
-#include <algorithm>
-#include <stdio.h>
-#include "tprintf.h"
-
-#include "rsm_client.h"
-#include "lock.h"
-
-using std::ostringstream;
-
-lock_state::lock_state():
- state(none)
-{
-}
-
-void lock_state::wait() {
- auto self = std::this_thread::get_id();
- {
- adopt_lock ml(m);
- c[self].wait(ml);
- }
- c.erase(self);
-}
-
-void lock_state::signal() {
- // signal anyone
- if (c.begin() != c.end())
- c.begin()->second.notify_one();
-}
-
-void lock_state::signal(std::thread::id who) {
- if (c.count(who))
- c[who].notify_one();
-}
-
-int lock_client_cache_rsm::last_port = 0;
-
-lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
- lock sl(lock_table_lock);
- // by the semantics of std::map, this will create
- // the lock if it doesn't already exist
- return lock_table[lid];
-}
-
-lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
- srand(time(NULL)^last_port);
- rlock_port = ((rand()%32000) | (0x1 << 10));
- const char *hname;
- // VERIFY(gethostname(hname, 100) == 0);
- hname = "127.0.0.1";
- ostringstream host;
- host << hname << ":" << rlock_port;
- id = host.str();
- last_port = rlock_port;
- rpcs *rlsrpc = new rpcs(rlock_port);
- rlsrpc->reg(rlock_protocol::revoke, &lock_client_cache_rsm::revoke_handler, this);
- rlsrpc->reg(rlock_protocol::retry, &lock_client_cache_rsm::retry_handler, this);
- {
- lock sl(xid_mutex);
- xid = 0;
- }
- rsmc = new rsm_client(xdst);
- releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this);
-}
-
-void lock_client_cache_rsm::releaser() {
- while (1) {
- lock_protocol::lockid_t lid;
- release_fifo.deq(&lid);
- LOG("Releaser: " << lid);
-
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
- VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
- st.state = lock_state::releasing;
- {
- sl.unlock();
- int r;
- rsmc->call(lock_protocol::release, r, lid, id, st.xid);
- sl.lock();
- }
- st.state = lock_state::none;
- LOG("Lock " << lid << ": none");
- st.signal();
- }
-}
-
-lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
- auto self = std::this_thread::get_id();
-
- // check for reentrancy
- VERIFY(st.state != lock_state::locked || st.held_by != self);
- VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
-
- st.wanted_by.push_back(self);
-
- while (1) {
- if (st.state != lock_state::free)
- LOG("Lock " << lid << ": not free");
-
- if (st.state == lock_state::none || st.state == lock_state::retrying) {
- if (st.state == lock_state::none) {
- lock sl(xid_mutex);
- st.xid = xid++;
- }
- st.state = lock_state::acquiring;
- LOG("Lock " << lid << ": acquiring");
- lock_protocol::status result;
- {
- sl.unlock();
- int r;
- result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
- sl.lock();
- }
- LOG("acquire returned " << result);
- if (result == lock_protocol::OK) {
- st.state = lock_state::free;
- LOG("Lock " << lid << ": free");
- }
- }
-
- VERIFY(st.wanted_by.size() != 0);
- if (st.state == lock_state::free) {
- // is it for me?
- auto front = st.wanted_by.front();
- if (front == releaser_thread.get_id()) {
- st.wanted_by.pop_front();
- st.state = lock_state::locked;
- st.held_by = releaser_thread.get_id();
- LOG("Queuing " << lid << " for release");
- release_fifo.enq(lid);
- } else if (front == self) {
- st.wanted_by.pop_front();
- st.state = lock_state::locked;
- st.held_by = self;
- break;
- } else {
- st.signal(front);
- }
- }
-
- LOG("waiting...");
- st.wait();
- LOG("wait ended");
- }
-
- LOG("Lock " << lid << ": locked");
- return lock_protocol::OK;
-}
-
-lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
- auto self = std::this_thread::get_id();
- VERIFY(st.state == lock_state::locked && st.held_by == self);
- st.state = lock_state::free;
- LOG("Lock " << lid << ": free");
- if (st.wanted_by.size()) {
- auto front = st.wanted_by.front();
- if (front == releaser_thread.get_id()) {
- st.state = lock_state::locked;
- st.held_by = releaser_thread.get_id();
- st.wanted_by.pop_front();
- LOG("Queuing " << lid << " for release");
- release_fifo.enq(lid);
- } else
- st.signal(front);
- }
- LOG("Finished signaling.");
- return lock_protocol::OK;
-}
-
-rlock_protocol::status lock_client_cache_rsm::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
- LOG("Revoke handler " << lid << " " << xid);
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
-
- if (st.state == lock_state::releasing || st.state == lock_state::none)
- return rlock_protocol::OK;
-
- if (st.state == lock_state::free &&
- (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
- // gimme
- st.state = lock_state::locked;
- st.held_by = releaser_thread.get_id();
- if (st.wanted_by.size())
- st.wanted_by.pop_front();
- release_fifo.enq(lid);
- } else {
- // get in line
- st.wanted_by.push_back(releaser_thread.get_id());
- }
- return rlock_protocol::OK;
-}
-
-rlock_protocol::status lock_client_cache_rsm::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
- VERIFY(st.state == lock_state::acquiring);
- st.state = lock_state::retrying;
- LOG("Lock " << lid << ": none");
- st.signal(); // only one thread needs to wake up
- return rlock_protocol::OK;
-}
+++ /dev/null
-// lock client interface.
-
-#ifndef lock_client_cache_rsm_h
-
-#define lock_client_cache_rsm_h
-
-#include <string>
-#include "lock_protocol.h"
-#include "rpc/rpc.h"
-#include "lock_client.h"
-#include "lang/verify.h"
-#include "rpc/fifo.h"
-#include "rsm_client.h"
-
-class lock_release_user {
- public:
- virtual void dorelease(lock_protocol::lockid_t) = 0;
- virtual ~lock_release_user() {};
-};
-
-using std::string;
-using std::thread;
-using std::list;
-using std::map;
-
-typedef string callback;
-
-class lock_state {
-public:
- lock_state();
- enum {
- none = 0,
- retrying,
- free,
- locked,
- acquiring,
- releasing
- } state;
- std::thread::id held_by;
- list<std::thread::id> wanted_by;
- mutex m;
- map<std::thread::id, std::condition_variable> c;
- lock_protocol::xid_t xid;
- void wait();
- void signal();
- void signal(std::thread::id who);
-};
-
-typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-
-class lock_client_cache_rsm;
-
-// Clients that caches locks. The server can revoke locks using
-// lock_revoke_server.
-class lock_client_cache_rsm : public lock_client {
- private:
- std::thread releaser_thread;
- rsm_client *rsmc;
- class lock_release_user *lu;
- int rlock_port;
- string hostname;
- string id;
- mutex xid_mutex;
- lock_protocol::xid_t xid;
- fifo<lock_protocol::lockid_t> release_fifo;
- mutex lock_table_lock;
- lock_map lock_table;
- lock_state &get_lock_state(lock_protocol::lockid_t lid);
- public:
- static int last_port;
- lock_client_cache_rsm(string xdst, class lock_release_user *l = 0);
- virtual ~lock_client_cache_rsm() {};
- lock_protocol::status acquire(lock_protocol::lockid_t);
- virtual lock_protocol::status release(lock_protocol::lockid_t);
- void releaser();
- rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
- rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
-};
-
-
-#endif
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <stdlib.h>
+// the caching lock server implementation
+
+#include "lock_server.h"
+#include <sstream>
#include <stdio.h>
#include <unistd.h>
-#include "lock_server_cache_rsm.h"
-#include "paxos.h"
-#include "rsm.h"
-
-// Main loop of lock_server
+#include <arpa/inet.h>
+#include "lang/verify.h"
+#include "handle.h"
+#include "tprintf.h"
+#include "rpc/marshall.h"
+#include "lock.h"
-char tprintf_thread_prefix = 's';
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
-int
-main(int argc, char *argv[])
+lock_state::lock_state():
+ held(false)
{
- setvbuf(stdout, NULL, _IONBF, 0);
- setvbuf(stderr, NULL, _IONBF, 0);
+}
+
+lock_state::lock_state(const lock_state &other) {
+ *this = other;
+}
+
+lock_state& lock_state::operator=(const lock_state& o) {
+ held = o.held;
+ held_by = o.held_by;
+ wanted_by = o.wanted_by;
+ old_requests = o.old_requests;
+ return *this;
+}
- srandom(getpid());
+template <class A, class B>
+ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
+ o << "<" << d.first << "," << d.second << ">";
+ return o;
+}
+
+marshall & operator<<(marshall &m, const lock_state &d) {
+ return m << d.held << d.held_by << d.wanted_by;
+}
- if(argc != 3){
- fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
- exit(1);
+unmarshall & operator>>(unmarshall &u, lock_state &d) {
+ return u >> d.held >> d.held_by >> d.wanted_by;
+}
+
+lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+ lock sl(lock_table_lock);
+ // by the semantics of map, this will create
+ // the lock if it doesn't already exist
+ return lock_table[lid];
+}
+
+lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
+ std::thread(&lock_server::revoker, this).detach();
+ std::thread(&lock_server::retryer, this).detach();
+ rsm->set_state_transfer(this);
+}
+
+void lock_server::revoker() {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ revoke_fifo.deq(&lid);
+ LOG("Revoking " << lid);
+ if (rsm && !rsm->amiprimary())
+ continue;
+
+ lock_state &st = get_lock_state(lid);
+ holder held_by;
+ {
+ lock sl(st.m);
+ held_by = st.held_by;
+ }
+
+ rpcc *proxy = NULL;
+ // try a few times?
+ //int t=5;
+ //while (t-- && !proxy)
+ proxy = handle(held_by.first).safebind();
+ if (proxy) {
+ int r;
+ rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+ LOG("Revoke returned " << ret);
+ }
}
+}
- rsm rsm(argv[1], argv[2]);
- lock_server_cache_rsm ls(&rsm);
- rsm.set_state_transfer(&ls);
+void lock_server::retryer() {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ retry_fifo.deq(&lid);
+ if (rsm && !rsm->amiprimary())
+ continue;
- rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls);
- rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls);
- rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls);
+ LOG("Sending retry for " << lid);
+ lock_state &st = get_lock_state(lid);
+ holder front;
+ {
+ lock sl(st.m);
+ if (st.wanted_by.empty())
+ continue;
+ front = st.wanted_by.front();
+ }
- while(1)
- sleep(1000);
+ rlock_protocol::status ret = -1;
+
+ rpcc *proxy = NULL;
+ // try a few times?
+ //int t=5;
+ //while (t-- && !proxy)
+ proxy = handle(front.first).safebind();
+ if (proxy) {
+ int r;
+ ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
+ LOG("Retry returned " << ret);
+ }
+ }
}
+
+int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+ LOG_FUNC_ENTER_SERVER;
+ holder h = holder(id, xid);
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+
+ // deal with duplicated requests
+ if (st.old_requests.count(id)) {
+ lock_protocol::xid_t old_xid = st.old_requests[id];
+ if (old_xid > xid)
+ return lock_protocol::RPCERR;
+ else if (old_xid == xid) {
+ if (st.held && st.held_by == h) {
+ LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+ return lock_protocol::OK;
+ }
+ }
+ }
+
+ // grant the lock if it's available and I'm next in line
+ if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
+ if (!st.wanted_by.empty())
+ st.wanted_by.pop_front();
+ st.old_requests[id] = xid;
+
+ st.held = true;
+ st.held_by = h;
+ LOG("Lock " << lid << " held by " << h.first);
+ if (st.wanted_by.size())
+ revoke_fifo.enq(lid);
+ return lock_protocol::OK;
+ }
+
+ // get in line
+ bool found = false;
+ for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
+ if (i->first == id) {
+ // make sure client is obeying serialization
+ if (i->second != xid) {
+ LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+ return lock_protocol::RPCERR;
+ }
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ st.wanted_by.push_back(h);
+
+ LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+
+ // send revoke if we're first in line
+ if (st.wanted_by.front() == h)
+ revoke_fifo.enq(lid);
+
+ return lock_protocol::RETRY;
+}
+
+int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
+ LOG_FUNC_ENTER_SERVER;
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ if (st.held && st.held_by == holder(id, xid)) {
+ st.held = false;
+ LOG("Lock " << lid << " not held");
+ }
+ if (st.wanted_by.size())
+ retry_fifo.enq(lid);
+ return lock_protocol::OK;
+}
+
+string lock_server::marshal_state() {
+ lock sl(lock_table_lock);
+ marshall rep;
+ rep << nacquire;
+ rep << lock_table;
+ return rep.str();
+}
+
+void lock_server::unmarshal_state(string state) {
+ lock sl(lock_table_lock);
+ unmarshall rep(state);
+ rep >> nacquire;
+ rep >> lock_table;
+}
+
+lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
+ printf("stat request\n");
+ VERIFY(0);
+ r = nacquire;
+ return lock_protocol::OK;
+}
+
-#ifndef lock_server_cache_rsm_h
-#define lock_server_cache_rsm_h
+#ifndef lock_server_h
+#define lock_server_h
#include <string>
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-class lock_server_cache_rsm : public rsm_state_transfer {
+class lock_server : public rsm_state_transfer {
private:
int nacquire;
mutex lock_table_lock;
fifo<lock_protocol::lockid_t> revoke_fifo;
class rsm *rsm;
public:
- lock_server_cache_rsm(class rsm *rsm = 0);
+ lock_server(class rsm *rsm = 0);
lock_protocol::status stat(int &, lock_protocol::lockid_t);
void revoker();
void retryer();
+++ /dev/null
-// the caching lock server implementation
-
-#include "lock_server_cache_rsm.h"
-#include <sstream>
-#include <stdio.h>
-#include <unistd.h>
-#include <arpa/inet.h>
-#include "lang/verify.h"
-#include "handle.h"
-#include "tprintf.h"
-#include "rpc/marshall.h"
-#include "lock.h"
-
-using std::ostringstream;
-using std::istringstream;
-using std::vector;
-
-lock_state::lock_state():
- held(false)
-{
-}
-
-lock_state::lock_state(const lock_state &other) {
- *this = other;
-}
-
-lock_state& lock_state::operator=(const lock_state& o) {
- held = o.held;
- held_by = o.held_by;
- wanted_by = o.wanted_by;
- old_requests = o.old_requests;
- return *this;
-}
-
-template <class A, class B>
-ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
- o << "<" << d.first << "," << d.second << ">";
- return o;
-}
-
-marshall & operator<<(marshall &m, const lock_state &d) {
- return m << d.held << d.held_by << d.wanted_by;
-}
-
-unmarshall & operator>>(unmarshall &u, lock_state &d) {
- return u >> d.held >> d.held_by >> d.wanted_by;
-}
-
-lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
- lock sl(lock_table_lock);
- // by the semantics of map, this will create
- // the lock if it doesn't already exist
- return lock_table[lid];
-}
-
-lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
- std::thread(&lock_server_cache_rsm::revoker, this).detach();
- std::thread(&lock_server_cache_rsm::retryer, this).detach();
- rsm->set_state_transfer(this);
-}
-
-void lock_server_cache_rsm::revoker() {
- while (1) {
- lock_protocol::lockid_t lid;
- revoke_fifo.deq(&lid);
- LOG("Revoking " << lid);
- if (rsm && !rsm->amiprimary())
- continue;
-
- lock_state &st = get_lock_state(lid);
- holder held_by;
- {
- lock sl(st.m);
- held_by = st.held_by;
- }
-
- rpcc *proxy = NULL;
- // try a few times?
- //int t=5;
- //while (t-- && !proxy)
- proxy = handle(held_by.first).safebind();
- if (proxy) {
- int r;
- rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
- LOG("Revoke returned " << ret);
- }
- }
-}
-
-void lock_server_cache_rsm::retryer() {
- while (1) {
- lock_protocol::lockid_t lid;
- retry_fifo.deq(&lid);
- if (rsm && !rsm->amiprimary())
- continue;
-
- LOG("Sending retry for " << lid);
- lock_state &st = get_lock_state(lid);
- holder front;
- {
- lock sl(st.m);
- if (st.wanted_by.empty())
- continue;
- front = st.wanted_by.front();
- }
-
- rlock_protocol::status ret = -1;
-
- rpcc *proxy = NULL;
- // try a few times?
- //int t=5;
- //while (t-- && !proxy)
- proxy = handle(front.first).safebind();
- if (proxy) {
- int r;
- ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
- LOG("Retry returned " << ret);
- }
- }
-}
-
-int lock_server_cache_rsm::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
- LOG_FUNC_ENTER_SERVER;
- holder h = holder(id, xid);
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
-
- // deal with duplicated requests
- if (st.old_requests.count(id)) {
- lock_protocol::xid_t old_xid = st.old_requests[id];
- if (old_xid > xid)
- return lock_protocol::RPCERR;
- else if (old_xid == xid) {
- if (st.held && st.held_by == h) {
- LOG("Client " << id << " sent duplicate acquire xid=" << xid);
- return lock_protocol::OK;
- }
- }
- }
-
- // grant the lock if it's available and I'm next in line
- if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
- if (!st.wanted_by.empty())
- st.wanted_by.pop_front();
- st.old_requests[id] = xid;
-
- st.held = true;
- st.held_by = h;
- LOG("Lock " << lid << " held by " << h.first);
- if (st.wanted_by.size())
- revoke_fifo.enq(lid);
- return lock_protocol::OK;
- }
-
- // get in line
- bool found = false;
- for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
- if (i->first == id) {
- // make sure client is obeying serialization
- if (i->second != xid) {
- LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
- return lock_protocol::RPCERR;
- }
- found = true;
- break;
- }
- }
- if (!found)
- st.wanted_by.push_back(h);
-
- LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
-
- // send revoke if we're first in line
- if (st.wanted_by.front() == h)
- revoke_fifo.enq(lid);
-
- return lock_protocol::RETRY;
-}
-
-int lock_server_cache_rsm::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
- LOG_FUNC_ENTER_SERVER;
- lock_state &st = get_lock_state(lid);
- lock sl(st.m);
- if (st.held && st.held_by == holder(id, xid)) {
- st.held = false;
- LOG("Lock " << lid << " not held");
- }
- if (st.wanted_by.size())
- retry_fifo.enq(lid);
- return lock_protocol::OK;
-}
-
-string lock_server_cache_rsm::marshal_state() {
- lock sl(lock_table_lock);
- marshall rep;
- rep << nacquire;
- rep << lock_table;
- return rep.str();
-}
-
-void lock_server_cache_rsm::unmarshal_state(string state) {
- lock sl(lock_table_lock);
- unmarshall rep(state);
- rep >> nacquire;
- rep >> lock_table;
-}
-
-lock_protocol::status lock_server_cache_rsm::stat(int &r, lock_protocol::lockid_t lid) {
- printf("stat request\n");
- r = nacquire;
- return lock_protocol::OK;
-}
-
--- /dev/null
+#include "rpc/rpc.h"
+#include <arpa/inet.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include "lock_server.h"
+#include "paxos.h"
+#include "rsm.h"
+
+// Main loop of lock_server
+
+char tprintf_thread_prefix = 's';
+
+int
+main(int argc, char *argv[])
+{
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+
+ srandom(getpid());
+
+ if(argc != 3){
+ fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+ exit(1);
+ }
+
+ rsm rsm(argv[1], argv[2]);
+ lock_server ls(&rsm);
+ rsm.set_state_transfer(&ls);
+
+ rsm.reg(lock_protocol::acquire, &lock_server::acquire, &ls);
+ rsm.reg(lock_protocol::release, &lock_server::release, &ls);
+ rsm.reg(lock_protocol::stat, &lock_server::stat, &ls);
+
+ while(1)
+ sleep(1000);
+}
#include <stdlib.h>
#include <stdio.h>
#include "lang/verify.h"
-#include "lock_client_cache_rsm.h"
#include "tprintf.h"
#include <sys/types.h>
#include <unistd.h>
// must be >= 2
const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
std::string dst;
-lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt];
+lock_client **lc = new lock_client * [nt];
lock_protocol::lockid_t a = "1";
lock_protocol::lockid_t b = "2";
lock_protocol::lockid_t c = "3";
}
tprintf("cache lock client\n");
- for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst);
+ for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst);
if(!test || test == 1){
test1();
adopt_lock ml(rsm_mutex);
ml.unlock();
{
- // Make sure that the state of lock_server_cache_rsm is stable during
+ // Make sure that the state of lock_server is stable during
// synchronization; otherwise, the primary's state may be more recent
// than replicas after the synchronization.
lock ml(invoke_mutex);
// By acquiring and releasing the invoke_mutex once, we make sure that
- // the state of lock_server_cache_rsm will not be changed until all
+ // the state of lock_server will not be changed until all
// replicas are synchronized. The reason is that client_invoke arrives
// after this point of time will see inviewchange == true, and returns
// BUSY.