+void lock_state::wait(lock & mutex_lock) {
+ auto self = this_thread::get_id();
+ c[self].wait(mutex_lock);
+ c.erase(self);
+}
+
+void lock_state::signal() {
+ // signal anyone
+ if (c.begin() != c.end())
+ c.begin()->second.notify_one();
+}
+
+void lock_state::signal(thread::id who) {
+ if (c.count(who))
+ c[who].notify_one();
+}
+
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
+in_port_t lock_client::last_port = 0;
+
+lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
+ lock sl(lock_table_lock);
+ return lock_table[lid]; // creates the lock if it doesn't already exist
+}
+
+lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
+ cl = unique_ptr<rpcc>(new rpcc(xdst));
+ if (cl->bind() < 0)
+ LOG("lock_client: call bind");
+
+ srandom((uint32_t)time(NULL)^last_port);
+ rlock_port = ((random()%32000) | (0x1 << 10));
+ id = "127.0.0.1:" + to_string(rlock_port);
+ last_port = rlock_port;
+ rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
+ rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
+ rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
+ rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
+ releaser_thread = thread(&lock_client::releaser, this);
+ rlsrpc->start();
+}
+
+void lock_client::releaser() [[noreturn]] {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ release_fifo.deq(&lid);
+ LOG("Releaser: " << lid);
+
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
+ st.state = lock_state::releasing;
+ {
+ sl.unlock();
+ int r;
+ rsmc->call(lock_protocol::release, r, lid, id, st.xid);
+ if (lu)
+ lu->dorelease(lid);
+ sl.lock();
+ }
+ st.state = lock_state::none;
+ LOG("Lock " << lid << ": none");
+ st.signal();