More renaming
authorPeter Iannucci <iannucci@mit.edu>
Fri, 20 Sep 2013 16:52:40 +0000 (12:52 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 25 Sep 2013 01:46:13 +0000 (21:46 -0400)
Makefile
lock_client.cc
lock_client.h
lock_client_cache_rsm.cc [deleted file]
lock_client_cache_rsm.h [deleted file]
lock_server.cc
lock_server.h [moved from lock_server_cache_rsm.h with 88% similarity]
lock_server_cache_rsm.cc [deleted file]
lock_smain.cc [new file with mode: 0644]
lock_tester.cc
rsm.cc

index 9d3ae06..6bd396f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -16,10 +16,10 @@ rpc/rpctest: rpc/rpctest.o tprintf.o rpc/librpc.a
 lock_demo=lock_demo.o lock_client.o tprintf.o rsm_client.o handle.o
 lock_demo : $(lock_demo) rpc/librpc.a
 
-lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
+lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o
 lock_tester : $(lock_tester) rpc/librpc.a
 
-lock_server=lock_server.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server=lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server.o
 lock_server : $(lock_server) rpc/librpc.a
 
 rsm_tester=rsm_tester.o rsmtest_client.o tprintf.o
index a71a206..d996b40 100644 (file)
-// RPC stubs for clients to talk to lock_server
+// RPC stubs for clients to talk to lock_server, and cache the locks.
 
 #include "lock_client.h"
 #include "rpc/rpc.h"
-#include <arpa/inet.h>
-
 #include <sstream>
 #include <iostream>
+#include <algorithm>
 #include <stdio.h>
+#include "tprintf.h"
+#include <arpa/inet.h>
 
-lock_client::lock_client(std::string dst)
+#include "rsm_client.h"
+#include "lock.h"
+
+using std::ostringstream;
+
+lock_state::lock_state():
+    state(none)
 {
+}
+
+void lock_state::wait() {
+    auto self = std::this_thread::get_id();
+    {
+        adopt_lock ml(m);
+        c[self].wait(ml);
+    }
+    c.erase(self);
+}
+
+void lock_state::signal() {
+    // signal anyone
+    if (c.begin() != c.end())
+        c.begin()->second.notify_one();
+}
+
+void lock_state::signal(std::thread::id who) {
+    if (c.count(who))
+        c[who].notify_one();
+}
+
+int lock_client::last_port = 0;
+
+lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
+    lock sl(lock_table_lock);
+    // by the semantics of std::map, this will create
+    // the lock if it doesn't already exist
+    return lock_table[lid];
+}
+
+lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) {
     sockaddr_in dstsock;
-    make_sockaddr(dst.c_str(), &dstsock);
+    make_sockaddr(xdst.c_str(), &dstsock);
     cl = new rpcc(dstsock);
     if (cl->bind() < 0) {
         printf("lock_client: call bind\n");
     }
+
+    srand(time(NULL)^last_port);
+    rlock_port = ((rand()%32000) | (0x1 << 10));
+    const char *hname;
+    // VERIFY(gethostname(hname, 100) == 0);
+    hname = "127.0.0.1";
+    ostringstream host;
+    host << hname << ":" << rlock_port;
+    id = host.str();
+    last_port = rlock_port;
+    rpcs *rlsrpc = new rpcs(rlock_port);
+    rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
+    rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
+    {
+        lock sl(xid_mutex);
+        xid = 0;
+    }
+    rsmc = new rsm_client(xdst);
+    releaser_thread = std::thread(&lock_client::releaser, this);
 }
 
-int
-lock_client::stat(lock_protocol::lockid_t lid)
-{
+void lock_client::releaser() {
+    while (1) {
+        lock_protocol::lockid_t lid;
+        release_fifo.deq(&lid);
+        LOG("Releaser: " << lid);
+
+        lock_state &st = get_lock_state(lid);
+        lock sl(st.m);
+        VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
+        st.state = lock_state::releasing;
+        {
+            sl.unlock();
+            int r;
+            rsmc->call(lock_protocol::release, r, lid, id, st.xid);
+            sl.lock();
+        }
+        st.state = lock_state::none;
+        LOG("Lock " << lid << ": none");
+        st.signal();
+    }
+}
+
+int lock_client::stat(lock_protocol::lockid_t lid) {
+    VERIFY(0);
     int r;
     lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid);
     VERIFY (ret == lock_protocol::OK);
     return r;
 }
 
-lock_protocol::status
-lock_client::acquire(lock_protocol::lockid_t lid)
-{
-    int r;
-    return cl->call(lock_protocol::acquire, r, cl->id(), lid);
+lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+    auto self = std::this_thread::get_id();
+
+    // check for reentrancy
+    VERIFY(st.state != lock_state::locked || st.held_by != self);
+    VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
+
+    st.wanted_by.push_back(self);
+
+    while (1) {
+        if (st.state != lock_state::free)
+            LOG("Lock " << lid << ": not free");
+
+        if (st.state == lock_state::none || st.state == lock_state::retrying) {
+            if (st.state == lock_state::none) {
+                lock sl(xid_mutex);
+                st.xid = xid++;
+            }
+            st.state = lock_state::acquiring;
+            LOG("Lock " << lid << ": acquiring");
+            lock_protocol::status result;
+            {
+                sl.unlock();
+                int r;
+                result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
+                sl.lock();
+            }
+            LOG("acquire returned " << result);
+            if (result == lock_protocol::OK) {
+                st.state = lock_state::free;
+                LOG("Lock " << lid << ": free");
+            }
+        }
+
+        VERIFY(st.wanted_by.size() != 0);
+        if (st.state == lock_state::free) {
+            // is it for me?
+            auto front = st.wanted_by.front();
+            if (front == releaser_thread.get_id()) {
+                st.wanted_by.pop_front();
+                st.state = lock_state::locked;
+                st.held_by = releaser_thread.get_id();
+                LOG("Queuing " << lid << " for release");
+                release_fifo.enq(lid);
+            } else if (front == self) {
+                st.wanted_by.pop_front();
+                st.state = lock_state::locked;
+                st.held_by = self;
+                break;
+            } else {
+                st.signal(front);
+            }
+        }
+
+        LOG("waiting...");
+        st.wait();
+        LOG("wait ended");
+    }
+
+    LOG("Lock " << lid << ": locked");
+    return lock_protocol::OK;
 }
 
-lock_protocol::status
-lock_client::release(lock_protocol::lockid_t lid)
-{
-    int r;
-    return cl->call(lock_protocol::release, r, cl->id(), lid);
+lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+    auto self = std::this_thread::get_id();
+    VERIFY(st.state == lock_state::locked && st.held_by == self);
+    st.state = lock_state::free;
+    LOG("Lock " << lid << ": free");
+    if (st.wanted_by.size()) {
+        auto front = st.wanted_by.front();
+        if (front == releaser_thread.get_id()) {
+            st.state = lock_state::locked;
+            st.held_by = releaser_thread.get_id();
+            st.wanted_by.pop_front();
+            LOG("Queuing " << lid << " for release");
+            release_fifo.enq(lid);
+        } else
+            st.signal(front);
+    }
+    LOG("Finished signaling.");
+    return lock_protocol::OK;
+}
+
+rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
+    LOG("Revoke handler " << lid << " " << xid);
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+
+    if (st.state == lock_state::releasing || st.state == lock_state::none)
+        return rlock_protocol::OK;
+
+    if (st.state == lock_state::free &&
+        (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
+        // gimme
+        st.state = lock_state::locked;
+        st.held_by = releaser_thread.get_id();
+        if (st.wanted_by.size())
+            st.wanted_by.pop_front();
+        release_fifo.enq(lid);
+    } else {
+        // get in line
+        st.wanted_by.push_back(releaser_thread.get_id());
+    }
+    return rlock_protocol::OK;
+}
+
+rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+    VERIFY(st.state == lock_state::acquiring);
+    st.state = lock_state::retrying;
+    LOG("Lock " << lid << ": none");
+    st.signal(); // only one thread needs to wake up
+    return rlock_protocol::OK;
 }
 
 t4_lock_client *t4_lock_client_new(const char *dst) {
@@ -60,4 +245,3 @@ t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
     return ((lock_client *)client)->stat(lid);
 }
-
index b1176c4..7b5edf6 100644 (file)
@@ -1,6 +1,7 @@
 // lock client interface.
 
 #ifndef lock_client_h
+
 #define lock_client_h
 
 #ifdef __cplusplus
@@ -8,21 +9,76 @@
 #include <string>
 #include "lock_protocol.h"
 #include "rpc/rpc.h"
-#include <vector>
+#include "lang/verify.h"
+#include "rpc/fifo.h"
+#include "rsm_client.h"
+
+class lock_release_user {
+    public:
+        virtual void dorelease(lock_protocol::lockid_t) = 0;
+        virtual ~lock_release_user() {};
+};
+
+using std::string;
+using std::thread;
+using std::list;
+using std::map;
+
+typedef string callback;
 
-// Client interface to the lock server
+class lock_state {
+public:
+    lock_state();
+    enum {
+        none = 0,
+        retrying,
+        free,
+        locked,
+        acquiring,
+        releasing
+    } state;
+    std::thread::id held_by;
+    list<std::thread::id> wanted_by;
+    mutex m;
+    map<std::thread::id, std::condition_variable> c;
+    lock_protocol::xid_t xid;
+    void wait();
+    void signal();
+    void signal(std::thread::id who);
+};
+
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
+// Clients that caches locks.  The server can revoke locks using
+// lock_revoke_server.
 class lock_client {
- protected:
-  rpcc *cl;
- public:
-  lock_client(std::string d);
-  virtual ~lock_client() {};
-  virtual lock_protocol::status acquire(lock_protocol::lockid_t);
-  virtual lock_protocol::status release(lock_protocol::lockid_t);
-  virtual lock_protocol::status stat(lock_protocol::lockid_t);
+    private:
+        rpcc *cl;
+        std::thread releaser_thread;
+        rsm_client *rsmc;
+        class lock_release_user *lu;
+        int rlock_port;
+        string hostname;
+        string id;
+        mutex xid_mutex;
+        lock_protocol::xid_t xid;
+        fifo<lock_protocol::lockid_t> release_fifo;
+        mutex lock_table_lock;
+        lock_map lock_table;
+        lock_state &get_lock_state(lock_protocol::lockid_t lid);
+    public:
+        static int last_port;
+        lock_client(string xdst, class lock_release_user *l = 0);
+        ~lock_client() {};
+        lock_protocol::status acquire(lock_protocol::lockid_t);
+        lock_protocol::status release(lock_protocol::lockid_t);
+        int stat(lock_protocol::lockid_t);
+        void releaser();
+        rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
+        rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
 };
 
-#endif
+#endif // C++
 
 extern "C" {
 
diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc
deleted file mode 100644 (file)
index 2a6d6e3..0000000
+++ /dev/null
@@ -1,212 +0,0 @@
-// RPC stubs for clients to talk to lock_server, and cache the locks
-// see lock_client.cache.h for protocol details.
-
-#include "lock_client_cache_rsm.h"
-#include "rpc/rpc.h"
-#include <sstream>
-#include <iostream>
-#include <algorithm>
-#include <stdio.h>
-#include "tprintf.h"
-
-#include "rsm_client.h"
-#include "lock.h"
-
-using std::ostringstream;
-
-lock_state::lock_state():
-    state(none)
-{
-}
-
-void lock_state::wait() {
-    auto self = std::this_thread::get_id();
-    {
-        adopt_lock ml(m);
-        c[self].wait(ml);
-    }
-    c.erase(self);
-}
-
-void lock_state::signal() {
-    // signal anyone
-    if (c.begin() != c.end())
-        c.begin()->second.notify_one();
-}
-
-void lock_state::signal(std::thread::id who) {
-    if (c.count(who))
-        c[who].notify_one();
-}
-
-int lock_client_cache_rsm::last_port = 0;
-
-lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
-    lock sl(lock_table_lock);
-    // by the semantics of std::map, this will create
-    // the lock if it doesn't already exist
-    return lock_table[lid];
-}
-
-lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
-    srand(time(NULL)^last_port);
-    rlock_port = ((rand()%32000) | (0x1 << 10));
-    const char *hname;
-    // VERIFY(gethostname(hname, 100) == 0);
-    hname = "127.0.0.1";
-    ostringstream host;
-    host << hname << ":" << rlock_port;
-    id = host.str();
-    last_port = rlock_port;
-    rpcs *rlsrpc = new rpcs(rlock_port);
-    rlsrpc->reg(rlock_protocol::revoke, &lock_client_cache_rsm::revoke_handler, this);
-    rlsrpc->reg(rlock_protocol::retry, &lock_client_cache_rsm::retry_handler, this);
-    {
-        lock sl(xid_mutex);
-        xid = 0;
-    }
-    rsmc = new rsm_client(xdst);
-    releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this);
-}
-
-void lock_client_cache_rsm::releaser() {
-    while (1) {
-        lock_protocol::lockid_t lid;
-        release_fifo.deq(&lid);
-        LOG("Releaser: " << lid);
-
-        lock_state &st = get_lock_state(lid);
-        lock sl(st.m);
-        VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
-        st.state = lock_state::releasing;
-        {
-            sl.unlock();
-            int r;
-            rsmc->call(lock_protocol::release, r, lid, id, st.xid);
-            sl.lock();
-        }
-        st.state = lock_state::none;
-        LOG("Lock " << lid << ": none");
-        st.signal();
-    }
-}
-
-lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
-    lock_state &st = get_lock_state(lid);
-    lock sl(st.m);
-    auto self = std::this_thread::get_id();
-
-    // check for reentrancy
-    VERIFY(st.state != lock_state::locked || st.held_by != self);
-    VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
-
-    st.wanted_by.push_back(self);
-
-    while (1) {
-        if (st.state != lock_state::free)
-            LOG("Lock " << lid << ": not free");
-
-        if (st.state == lock_state::none || st.state == lock_state::retrying) {
-            if (st.state == lock_state::none) {
-                lock sl(xid_mutex);
-                st.xid = xid++;
-            }
-            st.state = lock_state::acquiring;
-            LOG("Lock " << lid << ": acquiring");
-            lock_protocol::status result;
-            {
-                sl.unlock();
-                int r;
-                result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
-                sl.lock();
-            }
-            LOG("acquire returned " << result);
-            if (result == lock_protocol::OK) {
-                st.state = lock_state::free;
-                LOG("Lock " << lid << ": free");
-            }
-        }
-
-        VERIFY(st.wanted_by.size() != 0);
-        if (st.state == lock_state::free) {
-            // is it for me?
-            auto front = st.wanted_by.front();
-            if (front == releaser_thread.get_id()) {
-                st.wanted_by.pop_front();
-                st.state = lock_state::locked;
-                st.held_by = releaser_thread.get_id();
-                LOG("Queuing " << lid << " for release");
-                release_fifo.enq(lid);
-            } else if (front == self) {
-                st.wanted_by.pop_front();
-                st.state = lock_state::locked;
-                st.held_by = self;
-                break;
-            } else {
-                st.signal(front);
-            }
-        }
-
-        LOG("waiting...");
-        st.wait();
-        LOG("wait ended");
-    }
-
-    LOG("Lock " << lid << ": locked");
-    return lock_protocol::OK;
-}
-
-lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
-    lock_state &st = get_lock_state(lid);
-    lock sl(st.m);
-    auto self = std::this_thread::get_id();
-    VERIFY(st.state == lock_state::locked && st.held_by == self);
-    st.state = lock_state::free;
-    LOG("Lock " << lid << ": free");
-    if (st.wanted_by.size()) {
-        auto front = st.wanted_by.front();
-        if (front == releaser_thread.get_id()) {
-            st.state = lock_state::locked;
-            st.held_by = releaser_thread.get_id();
-            st.wanted_by.pop_front();
-            LOG("Queuing " << lid << " for release");
-            release_fifo.enq(lid);
-        } else
-            st.signal(front);
-    }
-    LOG("Finished signaling.");
-    return lock_protocol::OK;
-}
-
-rlock_protocol::status lock_client_cache_rsm::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
-    LOG("Revoke handler " << lid << " " << xid);
-    lock_state &st = get_lock_state(lid);
-    lock sl(st.m);
-
-    if (st.state == lock_state::releasing || st.state == lock_state::none)
-        return rlock_protocol::OK;
-
-    if (st.state == lock_state::free &&
-        (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
-        // gimme
-        st.state = lock_state::locked;
-        st.held_by = releaser_thread.get_id();
-        if (st.wanted_by.size())
-            st.wanted_by.pop_front();
-        release_fifo.enq(lid);
-    } else {
-        // get in line
-        st.wanted_by.push_back(releaser_thread.get_id());
-    }
-    return rlock_protocol::OK;
-}
-
-rlock_protocol::status lock_client_cache_rsm::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
-    lock_state &st = get_lock_state(lid);
-    lock sl(st.m);
-    VERIFY(st.state == lock_state::acquiring);
-    st.state = lock_state::retrying;
-    LOG("Lock " << lid << ": none");
-    st.signal(); // only one thread needs to wake up
-    return rlock_protocol::OK;
-}
diff --git a/lock_client_cache_rsm.h b/lock_client_cache_rsm.h
deleted file mode 100644 (file)
index 815224c..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-// lock client interface.
-
-#ifndef lock_client_cache_rsm_h
-
-#define lock_client_cache_rsm_h
-
-#include <string>
-#include "lock_protocol.h"
-#include "rpc/rpc.h"
-#include "lock_client.h"
-#include "lang/verify.h"
-#include "rpc/fifo.h"
-#include "rsm_client.h"
-
-class lock_release_user {
-    public:
-        virtual void dorelease(lock_protocol::lockid_t) = 0;
-        virtual ~lock_release_user() {};
-};
-
-using std::string;
-using std::thread;
-using std::list;
-using std::map;
-
-typedef string callback;
-
-class lock_state {
-public:
-    lock_state();
-    enum {
-        none = 0,
-        retrying,
-        free,
-        locked,
-        acquiring,
-        releasing
-    } state;
-    std::thread::id held_by;
-    list<std::thread::id> wanted_by;
-    mutex m;
-    map<std::thread::id, std::condition_variable> c;
-    lock_protocol::xid_t xid;
-    void wait();
-    void signal();
-    void signal(std::thread::id who);
-};
-
-typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-
-class lock_client_cache_rsm;
-
-// Clients that caches locks.  The server can revoke locks using
-// lock_revoke_server.
-class lock_client_cache_rsm : public lock_client {
-    private:
-        std::thread releaser_thread;
-        rsm_client *rsmc;
-        class lock_release_user *lu;
-        int rlock_port;
-        string hostname;
-        string id;
-        mutex xid_mutex;
-        lock_protocol::xid_t xid;
-        fifo<lock_protocol::lockid_t> release_fifo;
-        mutex lock_table_lock;
-        lock_map lock_table;
-        lock_state &get_lock_state(lock_protocol::lockid_t lid);
-    public:
-        static int last_port;
-        lock_client_cache_rsm(string xdst, class lock_release_user *l = 0);
-        virtual ~lock_client_cache_rsm() {};
-        lock_protocol::status acquire(lock_protocol::lockid_t);
-        virtual lock_protocol::status release(lock_protocol::lockid_t);
-        void releaser();
-        rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
-        rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
-};
-
-
-#endif
index 0f82080..f5a1fc4 100644 (file)
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <stdlib.h>
+// the caching lock server implementation
+
+#include "lock_server.h"
+#include <sstream>
 #include <stdio.h>
 #include <unistd.h>
-#include "lock_server_cache_rsm.h"
-#include "paxos.h"
-#include "rsm.h"
-
-// Main loop of lock_server
+#include <arpa/inet.h>
+#include "lang/verify.h"
+#include "handle.h"
+#include "tprintf.h"
+#include "rpc/marshall.h"
+#include "lock.h"
 
-char tprintf_thread_prefix = 's';
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
 
-int
-main(int argc, char *argv[])
+lock_state::lock_state():
+    held(false)
 {
-    setvbuf(stdout, NULL, _IONBF, 0);
-    setvbuf(stderr, NULL, _IONBF, 0);
+}
+
+lock_state::lock_state(const lock_state &other) {
+    *this = other;
+}
+
+lock_state& lock_state::operator=(const lock_state& o) {
+    held = o.held;
+    held_by = o.held_by;
+    wanted_by = o.wanted_by;
+    old_requests = o.old_requests;
+    return *this;
+}
 
-    srandom(getpid());
+template <class A, class B>
+ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
+    o << "<" << d.first << "," << d.second << ">";
+    return o;
+}
+
+marshall & operator<<(marshall &m, const lock_state &d) {
+       return m << d.held << d.held_by << d.wanted_by;
+}
 
-    if(argc != 3){
-        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
-        exit(1);
+unmarshall & operator>>(unmarshall &u, lock_state &d) {
+       return u >> d.held >> d.held_by >> d.wanted_by;
+}
+
+lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+    lock sl(lock_table_lock);
+    // by the semantics of map, this will create
+    // the lock if it doesn't already exist
+    return lock_table[lid];
+}
+
+lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
+    std::thread(&lock_server::revoker, this).detach();
+    std::thread(&lock_server::retryer, this).detach();
+    rsm->set_state_transfer(this);
+}
+
+void lock_server::revoker() {
+    while (1) {
+        lock_protocol::lockid_t lid;
+        revoke_fifo.deq(&lid);
+        LOG("Revoking " << lid);
+        if (rsm && !rsm->amiprimary())
+            continue;
+
+        lock_state &st = get_lock_state(lid);
+        holder held_by;
+        {
+            lock sl(st.m);
+            held_by = st.held_by;
+        }
+
+        rpcc *proxy = NULL;
+        // try a few times?
+        //int t=5;
+        //while (t-- && !proxy)
+        proxy = handle(held_by.first).safebind();
+        if (proxy) {
+            int r;
+            rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+            LOG("Revoke returned " << ret);
+        }
     }
+}
 
-    rsm rsm(argv[1], argv[2]);
-    lock_server_cache_rsm ls(&rsm);
-    rsm.set_state_transfer(&ls);
+void lock_server::retryer() {
+    while (1) {
+        lock_protocol::lockid_t lid;
+        retry_fifo.deq(&lid);
+        if (rsm && !rsm->amiprimary())
+            continue;
 
-    rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls);
-    rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls);
-    rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls);
+        LOG("Sending retry for " << lid);
+        lock_state &st = get_lock_state(lid);
+        holder front;
+        {
+            lock sl(st.m);
+            if (st.wanted_by.empty())
+                continue;
+            front = st.wanted_by.front();
+        }
 
-    while(1)
-        sleep(1000);
+        rlock_protocol::status ret = -1;
+
+        rpcc *proxy = NULL;
+        // try a few times?
+        //int t=5;
+        //while (t-- && !proxy)
+        proxy = handle(front.first).safebind();
+        if (proxy) {
+            int r;
+            ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
+            LOG("Retry returned " << ret);
+        }
+    }
 }
+
+int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+    LOG_FUNC_ENTER_SERVER;
+    holder h = holder(id, xid);
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+
+    // deal with duplicated requests
+    if (st.old_requests.count(id)) {
+        lock_protocol::xid_t old_xid = st.old_requests[id];
+        if (old_xid > xid)
+            return lock_protocol::RPCERR;
+        else if (old_xid == xid) {
+            if (st.held && st.held_by == h) {
+                LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+                return lock_protocol::OK;
+            }
+        }
+    }
+
+    // grant the lock if it's available and I'm next in line
+    if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
+        if (!st.wanted_by.empty())
+            st.wanted_by.pop_front();
+        st.old_requests[id] = xid;
+
+        st.held = true;
+        st.held_by = h;
+        LOG("Lock " << lid << " held by " << h.first);
+        if (st.wanted_by.size())
+            revoke_fifo.enq(lid);
+        return lock_protocol::OK;
+    }
+
+    // get in line
+    bool found = false;
+    for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
+        if (i->first == id) {
+            // make sure client is obeying serialization
+            if (i->second != xid) {
+                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+                return lock_protocol::RPCERR;
+            }
+            found = true;
+            break;
+        }
+    }
+    if (!found)
+        st.wanted_by.push_back(h);
+
+    LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+
+    // send revoke if we're first in line
+    if (st.wanted_by.front() == h)
+        revoke_fifo.enq(lid);
+
+    return lock_protocol::RETRY;
+}
+
+int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
+    LOG_FUNC_ENTER_SERVER;
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+    if (st.held && st.held_by == holder(id, xid)) {
+        st.held = false;
+        LOG("Lock " << lid << " not held");
+    }
+    if (st.wanted_by.size())
+        retry_fifo.enq(lid);
+    return lock_protocol::OK;
+}
+
+string lock_server::marshal_state() {
+    lock sl(lock_table_lock);
+    marshall rep;
+    rep << nacquire;
+    rep << lock_table;
+    return rep.str();
+}
+
+void lock_server::unmarshal_state(string state) {
+    lock sl(lock_table_lock);
+    unmarshall rep(state);
+    rep >> nacquire;
+    rep >> lock_table;
+}
+
+lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
+    printf("stat request\n");
+    VERIFY(0);
+    r = nacquire;
+    return lock_protocol::OK;
+}
+
similarity index 88%
rename from lock_server_cache_rsm.h
rename to lock_server.h
index c33b51e..2aa8445 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef lock_server_cache_rsm_h
-#define lock_server_cache_rsm_h
+#ifndef lock_server_h
+#define lock_server_h
 
 #include <string>
 
@@ -34,7 +34,7 @@ public:
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 
-class lock_server_cache_rsm : public rsm_state_transfer {
+class lock_server : public rsm_state_transfer {
     private:
         int nacquire;
         mutex lock_table_lock;
@@ -44,7 +44,7 @@ class lock_server_cache_rsm : public rsm_state_transfer {
         fifo<lock_protocol::lockid_t> revoke_fifo;
         class rsm *rsm;
     public:
-        lock_server_cache_rsm(class rsm *rsm = 0);
+        lock_server(class rsm *rsm = 0);
         lock_protocol::status stat(int &, lock_protocol::lockid_t);
         void revoker();
         void retryer();
diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc
deleted file mode 100644 (file)
index 8f3cf2b..0000000
+++ /dev/null
@@ -1,213 +0,0 @@
-// the caching lock server implementation
-
-#include "lock_server_cache_rsm.h"
-#include <sstream>
-#include <stdio.h>
-#include <unistd.h>
-#include <arpa/inet.h>
-#include "lang/verify.h"
-#include "handle.h"
-#include "tprintf.h"
-#include "rpc/marshall.h"
-#include "lock.h"
-
-using std::ostringstream;
-using std::istringstream;
-using std::vector;
-
-lock_state::lock_state():
-    held(false)
-{
-}
-
-lock_state::lock_state(const lock_state &other) {
-    *this = other;
-}
-
-lock_state& lock_state::operator=(const lock_state& o) {
-    held = o.held;
-    held_by = o.held_by;
-    wanted_by = o.wanted_by;
-    old_requests = o.old_requests;
-    return *this;
-}
-
-template <class A, class B>
-ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
-    o << "<" << d.first << "," << d.second << ">";
-    return o;
-}
-
-marshall & operator<<(marshall &m, const lock_state &d) {
-       return m << d.held << d.held_by << d.wanted_by;
-}
-
-unmarshall & operator>>(unmarshall &u, lock_state &d) {
-       return u >> d.held >> d.held_by >> d.wanted_by;
-}
-
-lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
-    lock sl(lock_table_lock);
-    // by the semantics of map, this will create
-    // the lock if it doesn't already exist
-    return lock_table[lid];
-}
-
-lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
-    std::thread(&lock_server_cache_rsm::revoker, this).detach();
-    std::thread(&lock_server_cache_rsm::retryer, this).detach();
-    rsm->set_state_transfer(this);
-}
-
-void lock_server_cache_rsm::revoker() {
-    while (1) {
-        lock_protocol::lockid_t lid;
-        revoke_fifo.deq(&lid);
-        LOG("Revoking " << lid);
-        if (rsm && !rsm->amiprimary())
-            continue;
-
-        lock_state &st = get_lock_state(lid);
-        holder held_by;
-        {
-            lock sl(st.m);
-            held_by = st.held_by;
-        }
-
-        rpcc *proxy = NULL;
-        // try a few times?
-        //int t=5;
-        //while (t-- && !proxy)
-        proxy = handle(held_by.first).safebind();
-        if (proxy) {
-            int r;
-            rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
-            LOG("Revoke returned " << ret);
-        }
-    }
-}
-
-void lock_server_cache_rsm::retryer() {
-    while (1) {
-        lock_protocol::lockid_t lid;
-        retry_fifo.deq(&lid);
-        if (rsm && !rsm->amiprimary())
-            continue;
-
-        LOG("Sending retry for " << lid);
-        lock_state &st = get_lock_state(lid);
-        holder front;
-        {
-            lock sl(st.m);
-            if (st.wanted_by.empty())
-                continue;
-            front = st.wanted_by.front();
-        }
-
-        rlock_protocol::status ret = -1;
-
-        rpcc *proxy = NULL;
-        // try a few times?
-        //int t=5;
-        //while (t-- && !proxy)
-        proxy = handle(front.first).safebind();
-        if (proxy) {
-            int r;
-            ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
-            LOG("Retry returned " << ret);
-        }
-    }
-}
-
-int lock_server_cache_rsm::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
-    LOG_FUNC_ENTER_SERVER;
-    holder h = holder(id, xid);
-    lock_state &st = get_lock_state(lid);
-    lock sl(st.m);
-
-    // deal with duplicated requests
-    if (st.old_requests.count(id)) {
-        lock_protocol::xid_t old_xid = st.old_requests[id];
-        if (old_xid > xid)
-            return lock_protocol::RPCERR;
-        else if (old_xid == xid) {
-            if (st.held && st.held_by == h) {
-                LOG("Client " << id << " sent duplicate acquire xid=" << xid);
-                return lock_protocol::OK;
-            }
-        }
-    }
-
-    // grant the lock if it's available and I'm next in line
-    if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
-        if (!st.wanted_by.empty())
-            st.wanted_by.pop_front();
-        st.old_requests[id] = xid;
-
-        st.held = true;
-        st.held_by = h;
-        LOG("Lock " << lid << " held by " << h.first);
-        if (st.wanted_by.size())
-            revoke_fifo.enq(lid);
-        return lock_protocol::OK;
-    }
-
-    // get in line
-    bool found = false;
-    for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
-        if (i->first == id) {
-            // make sure client is obeying serialization
-            if (i->second != xid) {
-                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
-                return lock_protocol::RPCERR;
-            }
-            found = true;
-            break;
-        }
-    }
-    if (!found)
-        st.wanted_by.push_back(h);
-
-    LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
-
-    // send revoke if we're first in line
-    if (st.wanted_by.front() == h)
-        revoke_fifo.enq(lid);
-
-    return lock_protocol::RETRY;
-}
-
-int lock_server_cache_rsm::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
-    LOG_FUNC_ENTER_SERVER;
-    lock_state &st = get_lock_state(lid);
-    lock sl(st.m);
-    if (st.held && st.held_by == holder(id, xid)) {
-        st.held = false;
-        LOG("Lock " << lid << " not held");
-    }
-    if (st.wanted_by.size())
-        retry_fifo.enq(lid);
-    return lock_protocol::OK;
-}
-
-string lock_server_cache_rsm::marshal_state() {
-    lock sl(lock_table_lock);
-    marshall rep;
-    rep << nacquire;
-    rep << lock_table;
-    return rep.str();
-}
-
-void lock_server_cache_rsm::unmarshal_state(string state) {
-    lock sl(lock_table_lock);
-    unmarshall rep(state);
-    rep >> nacquire;
-    rep >> lock_table;
-}
-
-lock_protocol::status lock_server_cache_rsm::stat(int &r, lock_protocol::lockid_t lid) {
-    printf("stat request\n");
-    r = nacquire;
-    return lock_protocol::OK;
-}
-
diff --git a/lock_smain.cc b/lock_smain.cc
new file mode 100644 (file)
index 0000000..086186e
--- /dev/null
@@ -0,0 +1,37 @@
+#include "rpc/rpc.h"
+#include <arpa/inet.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include "lock_server.h"
+#include "paxos.h"
+#include "rsm.h"
+
+// Main loop of lock_server
+
+char tprintf_thread_prefix = 's';
+
+int
+main(int argc, char *argv[])
+{
+    setvbuf(stdout, NULL, _IONBF, 0);
+    setvbuf(stderr, NULL, _IONBF, 0);
+
+    srandom(getpid());
+
+    if(argc != 3){
+        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+        exit(1);
+    }
+
+    rsm rsm(argv[1], argv[2]);
+    lock_server ls(&rsm);
+    rsm.set_state_transfer(&ls);
+
+    rsm.reg(lock_protocol::acquire, &lock_server::acquire, &ls);
+    rsm.reg(lock_protocol::release, &lock_server::release, &ls);
+    rsm.reg(lock_protocol::stat, &lock_server::stat, &ls);
+
+    while(1)
+        sleep(1000);
+}
index d063cdc..5c78c90 100644 (file)
@@ -10,7 +10,6 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include "lang/verify.h"
-#include "lock_client_cache_rsm.h"
 #include "tprintf.h"
 #include <sys/types.h>
 #include <unistd.h>
@@ -21,7 +20,7 @@ char tprintf_thread_prefix = 'c';
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
 std::string dst;
-lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt];
+lock_client **lc = new lock_client * [nt];
 lock_protocol::lockid_t a = "1";
 lock_protocol::lockid_t b = "2";
 lock_protocol::lockid_t c = "3";
@@ -174,7 +173,7 @@ main(int argc, char *argv[])
     }
 
     tprintf("cache lock client\n");
-    for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst);
+    for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst);
 
     if(!test || test == 1){
         test1();
diff --git a/rsm.cc b/rsm.cc
index 66c8d97..bdeabaa 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -189,12 +189,12 @@ bool rsm::sync_with_backups() {
     adopt_lock ml(rsm_mutex);
     ml.unlock();
     {
-        // Make sure that the state of lock_server_cache_rsm is stable during
+        // Make sure that the state of lock_server is stable during
         // synchronization; otherwise, the primary's state may be more recent
         // than replicas after the synchronization.
         lock ml(invoke_mutex);
         // By acquiring and releasing the invoke_mutex once, we make sure that
-        // the state of lock_server_cache_rsm will not be changed until all
+        // the state of lock_server will not be changed until all
         // replicas are synchronized. The reason is that client_invoke arrives
         // after this point of time will see inviewchange == true, and returns
         // BUSY.