Make the DF suspend lock recursive, and add RPC calls for batch suspend.

The idea is that if you have to execute many RPC calls, it is
faster to suspend once. The service class takes care to auto-resume
in the destructor in case the client just disappears.
develop
Alexander Gavrilov 2012-03-15 15:33:19 +04:00
parent 605ee9669a
commit 4eb4811777
6 changed files with 118 additions and 32 deletions

@ -98,6 +98,20 @@ struct Core::Cond
bool predicate; bool predicate;
}; };
struct Core::Private
{
tthread::mutex AccessMutex;
tthread::mutex StackMutex;
std::stack<Core::Cond*> suspended_tools;
Core::Cond core_cond;
thread::id df_suspend_thread;
int df_suspend_depth;
Private() {
df_suspend_depth = 0;
}
};
void cheap_tokenise(string const& input, vector<string> &output) void cheap_tokenise(string const& input, vector<string> &output)
{ {
string *cur = NULL; string *cur = NULL;
@ -550,6 +564,8 @@ void fIOthread(void * iodata)
Core::Core() Core::Core()
{ {
d = new Private();
// init the console. This must be always the first step! // init the console. This must be always the first step!
plug_mgr = 0; plug_mgr = 0;
vif = 0; vif = 0;
@ -559,10 +575,6 @@ Core::Core()
started = false; started = false;
memset(&(s_mods), 0, sizeof(s_mods)); memset(&(s_mods), 0, sizeof(s_mods));
// create mutex for syncing with interactive tasks
AccessMutex = 0;
StackMutex = 0;
core_cond = 0;
// set up hotkey capture // set up hotkey capture
hotkey_set = false; hotkey_set = false;
HotkeyMutex = 0; HotkeyMutex = 0;
@ -661,10 +673,7 @@ bool Core::Init()
df::global::InitGlobals(); df::global::InitGlobals();
// create mutex for syncing with interactive tasks // create mutex for syncing with interactive tasks
StackMutex = new mutex();
AccessMutex = new mutex();
misc_data_mutex=new mutex(); misc_data_mutex=new mutex();
core_cond = new Core::Cond();
cerr << "Initializing Plugins.\n"; cerr << "Initializing Plugins.\n";
// create plugin manager // create plugin manager
plug_mgr = new PluginManager(this); plug_mgr = new PluginManager(this);
@ -757,22 +766,49 @@ void *Core::GetData( std::string key )
void Core::Suspend() void Core::Suspend()
{ {
Core::Cond * nc = new Core::Cond(); auto tid = this_thread::get_id();
// If recursive, just increment the count
{
lock_guard<mutex> lock(d->AccessMutex);
if (d->df_suspend_depth > 0 && d->df_suspend_thread == tid)
{
d->df_suspend_depth++;
return;
}
}
// put the condition on a stack // put the condition on a stack
StackMutex->lock(); Core::Cond *nc = new Core::Cond();
suspended_tools.push(nc);
StackMutex->unlock(); {
lock_guard<mutex> lock2(d->StackMutex);
d->suspended_tools.push(nc);
}
// wait until Core::Update() wakes up the tool // wait until Core::Update() wakes up the tool
AccessMutex->lock(); {
nc->Lock(AccessMutex); lock_guard<mutex> lock(d->AccessMutex);
AccessMutex->unlock();
nc->Lock(&d->AccessMutex);
assert(d->df_suspend_depth == 0);
d->df_suspend_thread = tid;
d->df_suspend_depth = 1;
}
} }
void Core::Resume() void Core::Resume()
{ {
AccessMutex->lock(); auto tid = this_thread::get_id();
core_cond->Unlock(); lock_guard<mutex> lock(d->AccessMutex);
AccessMutex->unlock();
assert(d->df_suspend_depth > 0 && d->df_suspend_thread == tid);
if (--d->df_suspend_depth == 0)
d->core_cond.Unlock();
} }
int Core::TileUpdate() int Core::TileUpdate()
@ -827,21 +863,24 @@ int Core::Update()
// wake waiting tools // wake waiting tools
// do not allow more tools to join in while we process stuff here // do not allow more tools to join in while we process stuff here
StackMutex->lock(); lock_guard<mutex> lock_stack(d->StackMutex);
while (!suspended_tools.empty())
while (!d->suspended_tools.empty())
{ {
Core::Cond * nc = suspended_tools.top(); Core::Cond * nc = d->suspended_tools.top();
suspended_tools.pop(); d->suspended_tools.pop();
AccessMutex->lock();
// wake tool lock_guard<mutex> lock(d->AccessMutex);
nc->Unlock(); // wake tool
// wait for tool to wake us nc->Unlock();
core_cond->Lock(AccessMutex); // wait for tool to wake us
AccessMutex->unlock(); d->core_cond.Lock(&d->AccessMutex);
// verify
assert(d->df_suspend_depth == 0);
// destroy condition // destroy condition
delete nc; delete nc;
} }
StackMutex->unlock();
return 0; return 0;
}; };

@ -64,11 +64,21 @@ using dfproto::CoreTextFragment;
using google::protobuf::MessageLite; using google::protobuf::MessageLite;
CoreService::CoreService() { CoreService::CoreService() {
suspend_depth = 0;
// These 2 methods must be first, so that they get id 0 and 1 // These 2 methods must be first, so that they get id 0 and 1
addMethod("BindMethod", &CoreService::BindMethod); addMethod("BindMethod", &CoreService::BindMethod);
addMethod("RunCommand", &CoreService::RunCommand); addMethod("RunCommand", &CoreService::RunCommand);
// Add others here: // Add others here:
addMethod("CoreSuspend", &CoreService::CoreSuspend);
addMethod("CoreResume", &CoreService::CoreResume);
}
CoreService::~CoreService()
{
while (suspend_depth-- > 0)
Core::getInstance().Resume();
} }
command_result CoreService::BindMethod(color_ostream &stream, command_result CoreService::BindMethod(color_ostream &stream,
@ -107,6 +117,23 @@ command_result CoreService::RunCommand(color_ostream &stream,
return Core::getInstance().plug_mgr->InvokeCommand(stream, cmd, args); return Core::getInstance().plug_mgr->InvokeCommand(stream, cmd, args);
} }
command_result CoreService::CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt)
{
Core::getInstance().Suspend();
cnt->set_value(++suspend_depth);
return CR_OK;
}
command_result CoreService::CoreResume(color_ostream &stream, const EmptyMessage*, IntMessage *cnt)
{
if (suspend_depth <= 0)
return CR_WRONG_USAGE;
Core::getInstance().Resume();
cnt->set_value(--suspend_depth);
return CR_OK;
}
RPCService::RPCService() RPCService::RPCService()
{ {
owner = NULL; owner = NULL;

@ -141,24 +141,28 @@ namespace DFHack
DFHack::Console con; DFHack::Console con;
Core(); Core();
class Private;
Private *d;
bool Init(); bool Init();
int Update (void); int Update (void);
int TileUpdate (void); int TileUpdate (void);
int Shutdown (void); int Shutdown (void);
int SDL_Event(SDL::Event* event); int SDL_Event(SDL::Event* event);
bool ncurses_wgetch(int in, int & out); bool ncurses_wgetch(int in, int & out);
Core(Core const&); // Don't Implement Core(Core const&); // Don't Implement
void operator=(Core const&); // Don't implement void operator=(Core const&); // Don't implement
// report error to user while failing // report error to user while failing
void fatal (std::string output, bool will_deactivate); void fatal (std::string output, bool will_deactivate);
// 1 = fatal failure // 1 = fatal failure
bool errorstate; bool errorstate;
// regulate access to DF // regulate access to DF
struct Cond; struct Cond;
tthread::mutex * AccessMutex;
tthread::mutex * StackMutex;
std::stack < Core::Cond * > suspended_tools;
Core::Cond * core_cond;
// FIXME: shouldn't be kept around like this // FIXME: shouldn't be kept around like this
DFHack::VersionInfoFactory * vif; DFHack::VersionInfoFactory * vif;
// Module storage // Module storage

@ -34,6 +34,8 @@ distribution.
namespace DFHack namespace DFHack
{ {
using dfproto::EmptyMessage; using dfproto::EmptyMessage;
using dfproto::IntMessage;
using dfproto::StringMessage;
enum command_result enum command_result
{ {

@ -189,14 +189,20 @@ namespace DFHack
}; };
class CoreService : public RPCService { class CoreService : public RPCService {
int suspend_depth;
public: public:
CoreService(); CoreService();
~CoreService();
command_result BindMethod(color_ostream &stream, command_result BindMethod(color_ostream &stream,
const dfproto::CoreBindRequest *in, const dfproto::CoreBindRequest *in,
dfproto::CoreBindReply *out); dfproto::CoreBindReply *out);
command_result RunCommand(color_ostream &stream, command_result RunCommand(color_ostream &stream,
const dfproto::CoreRunCommandRequest *in); const dfproto::CoreRunCommandRequest *in);
// For batching
command_result CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt);
command_result CoreResume(color_ostream &stream, const EmptyMessage*, IntMessage *cnt);
}; };
class DFHACK_EXPORT ServerConnection { class DFHACK_EXPORT ServerConnection {

@ -45,6 +45,14 @@ message CoreErrorNotification {
message EmptyMessage {} message EmptyMessage {}
message IntMessage {
required int32 value = 1;
}
message StringMessage {
required string value = 1;
}
message CoreBindRequest { message CoreBindRequest {
required string method = 1; required string method = 1;
required string input_msg = 2; required string input_msg = 2;