From 4eb481177737fe47cda80808dc4a62bd864a15e0 Mon Sep 17 00:00:00 2001 From: Alexander Gavrilov Date: Thu, 15 Mar 2012 15:33:19 +0400 Subject: [PATCH] Make the DF suspend lock recursive, and add RPC calls for batch suspend. The idea is that if you have to execute many RPC calls, it is faster to suspend once. The service class takes care to auto-resume in the destructor in case the client just disappears. --- library/Core.cpp | 95 ++++++++++++++++++++++---------- library/RemoteServer.cpp | 27 +++++++++ library/include/Core.h | 12 ++-- library/include/RemoteClient.h | 2 + library/include/RemoteServer.h | 6 ++ library/proto/CoreProtocol.proto | 8 +++ 6 files changed, 118 insertions(+), 32 deletions(-) diff --git a/library/Core.cpp b/library/Core.cpp index 793a63680..928ad09e2 100644 --- a/library/Core.cpp +++ b/library/Core.cpp @@ -98,6 +98,20 @@ struct Core::Cond bool predicate; }; +struct Core::Private +{ + tthread::mutex AccessMutex; + tthread::mutex StackMutex; + std::stack suspended_tools; + Core::Cond core_cond; + thread::id df_suspend_thread; + int df_suspend_depth; + + Private() { + df_suspend_depth = 0; + } +}; + void cheap_tokenise(string const& input, vector &output) { string *cur = NULL; @@ -550,6 +564,8 @@ void fIOthread(void * iodata) Core::Core() { + d = new Private(); + // init the console. This must be always the first step! plug_mgr = 0; vif = 0; @@ -559,10 +575,6 @@ Core::Core() started = false; memset(&(s_mods), 0, sizeof(s_mods)); - // create mutex for syncing with interactive tasks - AccessMutex = 0; - StackMutex = 0; - core_cond = 0; // set up hotkey capture hotkey_set = false; HotkeyMutex = 0; @@ -661,10 +673,7 @@ bool Core::Init() df::global::InitGlobals(); // create mutex for syncing with interactive tasks - StackMutex = new mutex(); - AccessMutex = new mutex(); misc_data_mutex=new mutex(); - core_cond = new Core::Cond(); cerr << "Initializing Plugins.\n"; // create plugin manager plug_mgr = new PluginManager(this); @@ -757,22 +766,49 @@ void *Core::GetData( std::string key ) void Core::Suspend() { - Core::Cond * nc = new Core::Cond(); + auto tid = this_thread::get_id(); + + // If recursive, just increment the count + { + lock_guard lock(d->AccessMutex); + + if (d->df_suspend_depth > 0 && d->df_suspend_thread == tid) + { + d->df_suspend_depth++; + return; + } + } + // put the condition on a stack - StackMutex->lock(); - suspended_tools.push(nc); - StackMutex->unlock(); + Core::Cond *nc = new Core::Cond(); + + { + lock_guard lock2(d->StackMutex); + + d->suspended_tools.push(nc); + } + // wait until Core::Update() wakes up the tool - AccessMutex->lock(); - nc->Lock(AccessMutex); - AccessMutex->unlock(); + { + lock_guard lock(d->AccessMutex); + + nc->Lock(&d->AccessMutex); + + assert(d->df_suspend_depth == 0); + d->df_suspend_thread = tid; + d->df_suspend_depth = 1; + } } void Core::Resume() { - AccessMutex->lock(); - core_cond->Unlock(); - AccessMutex->unlock(); + auto tid = this_thread::get_id(); + lock_guard lock(d->AccessMutex); + + assert(d->df_suspend_depth > 0 && d->df_suspend_thread == tid); + + if (--d->df_suspend_depth == 0) + d->core_cond.Unlock(); } int Core::TileUpdate() @@ -827,21 +863,24 @@ int Core::Update() // wake waiting tools // do not allow more tools to join in while we process stuff here - StackMutex->lock(); - while (!suspended_tools.empty()) + lock_guard lock_stack(d->StackMutex); + + while (!d->suspended_tools.empty()) { - Core::Cond * nc = suspended_tools.top(); - suspended_tools.pop(); - AccessMutex->lock(); - // wake tool - nc->Unlock(); - // wait for tool to wake us - core_cond->Lock(AccessMutex); - AccessMutex->unlock(); + Core::Cond * nc = d->suspended_tools.top(); + d->suspended_tools.pop(); + + lock_guard lock(d->AccessMutex); + // wake tool + nc->Unlock(); + // wait for tool to wake us + d->core_cond.Lock(&d->AccessMutex); + // verify + assert(d->df_suspend_depth == 0); // destroy condition delete nc; } - StackMutex->unlock(); + return 0; }; diff --git a/library/RemoteServer.cpp b/library/RemoteServer.cpp index 14b64f19e..9d3c1101f 100644 --- a/library/RemoteServer.cpp +++ b/library/RemoteServer.cpp @@ -64,11 +64,21 @@ using dfproto::CoreTextFragment; using google::protobuf::MessageLite; CoreService::CoreService() { + suspend_depth = 0; + // These 2 methods must be first, so that they get id 0 and 1 addMethod("BindMethod", &CoreService::BindMethod); addMethod("RunCommand", &CoreService::RunCommand); // Add others here: + addMethod("CoreSuspend", &CoreService::CoreSuspend); + addMethod("CoreResume", &CoreService::CoreResume); +} + +CoreService::~CoreService() +{ + while (suspend_depth-- > 0) + Core::getInstance().Resume(); } command_result CoreService::BindMethod(color_ostream &stream, @@ -107,6 +117,23 @@ command_result CoreService::RunCommand(color_ostream &stream, return Core::getInstance().plug_mgr->InvokeCommand(stream, cmd, args); } +command_result CoreService::CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt) +{ + Core::getInstance().Suspend(); + cnt->set_value(++suspend_depth); + return CR_OK; +} + +command_result CoreService::CoreResume(color_ostream &stream, const EmptyMessage*, IntMessage *cnt) +{ + if (suspend_depth <= 0) + return CR_WRONG_USAGE; + + Core::getInstance().Resume(); + cnt->set_value(--suspend_depth); + return CR_OK; +} + RPCService::RPCService() { owner = NULL; diff --git a/library/include/Core.h b/library/include/Core.h index 740c35558..f4c2c2ae7 100644 --- a/library/include/Core.h +++ b/library/include/Core.h @@ -141,24 +141,28 @@ namespace DFHack DFHack::Console con; Core(); + + class Private; + Private *d; + bool Init(); int Update (void); int TileUpdate (void); int Shutdown (void); int SDL_Event(SDL::Event* event); bool ncurses_wgetch(int in, int & out); + Core(Core const&); // Don't Implement void operator=(Core const&); // Don't implement + // report error to user while failing void fatal (std::string output, bool will_deactivate); + // 1 = fatal failure bool errorstate; // regulate access to DF struct Cond; - tthread::mutex * AccessMutex; - tthread::mutex * StackMutex; - std::stack < Core::Cond * > suspended_tools; - Core::Cond * core_cond; + // FIXME: shouldn't be kept around like this DFHack::VersionInfoFactory * vif; // Module storage diff --git a/library/include/RemoteClient.h b/library/include/RemoteClient.h index 79efff977..af03b32a1 100644 --- a/library/include/RemoteClient.h +++ b/library/include/RemoteClient.h @@ -34,6 +34,8 @@ distribution. namespace DFHack { using dfproto::EmptyMessage; + using dfproto::IntMessage; + using dfproto::StringMessage; enum command_result { diff --git a/library/include/RemoteServer.h b/library/include/RemoteServer.h index 26363d582..c9cbc8252 100644 --- a/library/include/RemoteServer.h +++ b/library/include/RemoteServer.h @@ -189,14 +189,20 @@ namespace DFHack }; class CoreService : public RPCService { + int suspend_depth; public: CoreService(); + ~CoreService(); command_result BindMethod(color_ostream &stream, const dfproto::CoreBindRequest *in, dfproto::CoreBindReply *out); command_result RunCommand(color_ostream &stream, const dfproto::CoreRunCommandRequest *in); + + // For batching + command_result CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt); + command_result CoreResume(color_ostream &stream, const EmptyMessage*, IntMessage *cnt); }; class DFHACK_EXPORT ServerConnection { diff --git a/library/proto/CoreProtocol.proto b/library/proto/CoreProtocol.proto index 3ebe17a28..efe38c3b1 100644 --- a/library/proto/CoreProtocol.proto +++ b/library/proto/CoreProtocol.proto @@ -45,6 +45,14 @@ message CoreErrorNotification { message EmptyMessage {} +message IntMessage { + required int32 value = 1; +} + +message StringMessage { + required string value = 1; +} + message CoreBindRequest { required string method = 1; required string input_msg = 2;