Refactor CoreSuspender to fix Console::lineedit exit hangs

The old CoreSuspender requires processing from Core::Update to allow
commands execute. But that causes issues if Core::Shutdown wants
quarentee cleanup order with std:🧵:join. Fixing shutdown ordering
adds too many branches to already fairly complex code.

I decided to try to refactor CoreSuspender to use simpler locking
locking using a std::recusive_muted as primary synchronization
primitive.
To help control when Core::Update unlocks the primary mutex there is
std::contition_variable_any and std::atomic<size_t> queue lenght
counter.
The last state variable is std::atomic<std:🧵:id> that is used to
keep track of owner thread for Core::IsSuspended query.

This should be merged only just after a release to make sure that it
gets maximum testing in develop branch before next release.

Fixes #1066
develop
Pauli 2018-06-21 18:58:16 +03:00
parent 0bc1db4f07
commit f6b0ac7819
5 changed files with 124 additions and 177 deletions

@ -97,45 +97,18 @@ using df::global::world;
static bool parseKeySpec(std::string keyspec, int *psym, int *pmod, std::string *pfocus = NULL); static bool parseKeySpec(std::string keyspec, int *psym, int *pmod, std::string *pfocus = NULL);
size_t loadScriptFiles(Core* core, color_ostream& out, const vector<std::string>& prefix, const std::string& folder); size_t loadScriptFiles(Core* core, color_ostream& out, const vector<std::string>& prefix, const std::string& folder);
struct Core::Cond : public std::condition_variable //! mainThreadSuspend keeps the main DF thread suspended from Core::Init to
{ //! thread exit.
Cond() : template<typename M>
std::condition_variable{}, static std::unique_lock<M>& mainThreadSuspend(M& mutex) {
predicate{false} static thread_local std::unique_lock<M> lock(mutex, std::defer_lock);
{ return lock;
} }
~Cond()
{
}
bool Lock(std::unique_lock<std::mutex>& lock)
{
wait(lock, [this]() -> bool {return this->predicate;});
predicate = false;
return true;
}
bool Unlock()
{
predicate = true;
notify_one();
return true;
}
bool predicate;
};
struct Core::Private struct Core::Private
{ {
std::mutex AccessMutex;
std::mutex StackMutex;
std::stack<Core::Cond*> suspended_tools;
Core::Cond core_cond;
thread::id df_suspend_thread;
int df_suspend_depth;
std::thread iothread; std::thread iothread;
std::thread hotkeythread; std::thread hotkeythread;
Private() {
df_suspend_depth = 0;
}
}; };
struct CommandDepthCounter struct CommandDepthCounter
@ -1481,6 +1454,9 @@ void fIOthread(void * iodata)
Core::~Core() Core::~Core()
{ {
if (mainThreadSuspend(CoreSuspendMutex).owns_lock())
mainThreadSuspend(CoreSuspendMutex).unlock();
if (d->hotkeythread.joinable()) { if (d->hotkeythread.joinable()) {
std::lock_guard<std::mutex> lock(HotkeyMutex); std::lock_guard<std::mutex> lock(HotkeyMutex);
hotkey_set = SHUTDOWN; hotkey_set = SHUTDOWN;
@ -1497,7 +1473,11 @@ Core::Core() :
HotkeyMutex{}, HotkeyMutex{},
HotkeyCond{}, HotkeyCond{},
alias_mutex{}, alias_mutex{},
misc_data_mutex{} misc_data_mutex{},
CoreSuspendMutex{},
CoreWakeup{},
ownerThread{},
toolCount{0}
{ {
// init the console. This must be always the first step! // init the console. This must be always the first step!
plug_mgr = 0; plug_mgr = 0;
@ -1562,6 +1542,10 @@ bool Core::Init()
if(errorstate) if(errorstate)
return false; return false;
// Lock the CoreSuspendMutex until the thread exits or call Core::Shutdown
// Core::Update will temporary unlock when there is any commands queued
mainThreadSuspend(CoreSuspendMutex).lock();
// Re-route stdout and stderr again - DF seems to set up stdout and // Re-route stdout and stderr again - DF seems to set up stdout and
// stderr.txt on Windows as of 0.43.05. Also, log before switching files to // stderr.txt on Windows as of 0.43.05. Also, log before switching files to
// make it obvious what's going on if someone checks the *.txt files. // make it obvious what's going on if someone checks the *.txt files.
@ -1908,57 +1892,7 @@ void *Core::GetData( std::string key )
bool Core::isSuspended(void) bool Core::isSuspended(void)
{ {
lock_guard<mutex> lock(d->AccessMutex); return ownerThread.load() == std::this_thread::get_id();
return (d->df_suspend_depth > 0 && d->df_suspend_thread == this_thread::get_id());
}
void Core::Suspend()
{
auto tid = this_thread::get_id();
// If recursive, just increment the count
{
lock_guard<mutex> lock(d->AccessMutex);
if (d->df_suspend_depth > 0 && d->df_suspend_thread == tid)
{
d->df_suspend_depth++;
return;
}
}
// put the condition on a stack
Core::Cond *nc = new Core::Cond();
{
lock_guard<mutex> lock2(d->StackMutex);
d->suspended_tools.push(nc);
}
// wait until Core::Update() wakes up the tool
{
unique_lock<mutex> lock(d->AccessMutex);
nc->Lock(lock);
assert(d->df_suspend_depth == 0);
d->df_suspend_thread = tid;
d->df_suspend_depth = 1;
}
}
void Core::Resume()
{
auto tid = this_thread::get_id();
lock_guard<mutex> lock(d->AccessMutex);
assert(d->df_suspend_depth > 0 && d->df_suspend_thread == tid);
(void)tid;
if (--d->df_suspend_depth == 0)
d->core_cond.Unlock();
} }
int Core::TileUpdate() int Core::TileUpdate()
@ -1969,40 +1903,6 @@ int Core::TileUpdate()
return true; return true;
} }
int Core::ClaimSuspend(bool force_base)
{
auto tid = this_thread::get_id();
lock_guard<mutex> lock(d->AccessMutex);
if (force_base || d->df_suspend_depth <= 0)
{
assert(d->df_suspend_depth == 0);
d->df_suspend_thread = tid;
d->df_suspend_depth = 1000000;
return 1000000;
}
else
{
assert(d->df_suspend_thread == tid);
return ++d->df_suspend_depth;
}
}
void Core::DisclaimSuspend(int level)
{
auto tid = this_thread::get_id();
lock_guard<mutex> lock(d->AccessMutex);
assert(d->df_suspend_depth == level && d->df_suspend_thread == tid);
(void)tid;
if (level == 1000000)
d->df_suspend_depth = 0;
else
--d->df_suspend_depth;
}
void Core::doUpdate(color_ostream &out, bool first_update) void Core::doUpdate(color_ostream &out, bool first_update)
{ {
Lua::Core::Reset(out, "DF code execution"); Lua::Core::Reset(out, "DF code execution");
@ -2122,27 +2022,9 @@ int Core::Update()
doUpdate(out, first_update); doUpdate(out, first_update);
} }
// wake waiting tools // Let all commands run that require CoreSuspender
// do not allow more tools to join in while we process stuff here CoreWakeup.wait(mainThreadSuspend(CoreSuspendMutex),
lock_guard<mutex> lock_stack(d->StackMutex); [this]() -> bool {return this->toolCount.load() == 0;});
while (!d->suspended_tools.empty())
{
Core::Cond * nc = d->suspended_tools.top();
d->suspended_tools.pop();
std::unique_lock<mutex> lock(d->AccessMutex);
// wake tool
nc->Unlock();
// wait for tool to wake us
d->core_cond.Lock(lock);
// verify
assert(d->df_suspend_depth == 0);
// destroy condition
delete nc;
// check lua stack depth
Lua::Core::Reset(out, "suspend");
}
return 0; return 0;
}; };
@ -2358,15 +2240,20 @@ void Core::onStateChange(color_ostream &out, state_change_event event)
int Core::Shutdown ( void ) int Core::Shutdown ( void )
{ {
if(errorstate)
return true;
errorstate = 1;
// Make sure we release main thread if this is called from main thread
if (mainThreadSuspend(CoreSuspendMutex).owns_lock())
mainThreadSuspend(CoreSuspendMutex).unlock();
// Make sure the console thread shutdowns before clean up to avoid any // Make sure the console thread shutdowns before clean up to avoid any
// unlikely data races. // unlikely data races.
if (d->iothread.joinable()) { if (d->iothread.joinable()) {
con.shutdown(); con.shutdown();
d->iothread.join();
} }
if(errorstate)
return true;
errorstate = 1;
if (d->hotkeythread.joinable()) { if (d->hotkeythread.joinable()) {
std::unique_lock<std::mutex> hot_lock(HotkeyMutex); std::unique_lock<std::mutex> hot_lock(HotkeyMutex);
hotkey_set = SHUTDOWN; hotkey_set = SHUTDOWN;
@ -2374,6 +2261,7 @@ int Core::Shutdown ( void )
} }
d->hotkeythread.join(); d->hotkeythread.join();
d->iothread.join();
CoreSuspendClaimer suspend; CoreSuspendClaimer suspend;
if(plug_mgr) if(plug_mgr)

@ -652,8 +652,10 @@ static command_result SetUnitLabors(color_ostream &stream, const SetUnitLaborsIn
return CR_OK; return CR_OK;
} }
CoreService::CoreService() { CoreService::CoreService() :
suspend_depth = 0; suspend_depth{0},
coreSuspender{nullptr}
{
// These 2 methods must be first, so that they get id 0 and 1 // These 2 methods must be first, so that they get id 0 and 1
addMethod("BindMethod", &CoreService::BindMethod, SF_DONT_SUSPEND | SF_ALLOW_REMOTE); addMethod("BindMethod", &CoreService::BindMethod, SF_DONT_SUSPEND | SF_ALLOW_REMOTE);
@ -683,8 +685,7 @@ CoreService::CoreService() {
CoreService::~CoreService() CoreService::~CoreService()
{ {
while (suspend_depth-- > 0) delete coreSuspender;
Core::getInstance().Resume();
} }
command_result CoreService::BindMethod(color_ostream &stream, command_result CoreService::BindMethod(color_ostream &stream,
@ -725,7 +726,8 @@ command_result CoreService::RunCommand(color_ostream &stream,
command_result CoreService::CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt) command_result CoreService::CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt)
{ {
Core::getInstance().Suspend(); if (suspend_depth == 0)
coreSuspender = new CoreSuspender();
cnt->set_value(++suspend_depth); cnt->set_value(++suspend_depth);
return CR_OK; return CR_OK;
} }
@ -735,8 +737,11 @@ command_result CoreService::CoreResume(color_ostream &stream, const EmptyMessage
if (suspend_depth <= 0) if (suspend_depth <= 0)
return CR_WRONG_USAGE; return CR_WRONG_USAGE;
Core::getInstance().Resume();
cnt->set_value(--suspend_depth); cnt->set_value(--suspend_depth);
if (suspend_depth == 0) {
delete coreSuspender;
coreSuspender = nullptr;
}
return CR_OK; return CR_OK;
} }

@ -34,8 +34,10 @@ distribution.
#include "Console.h" #include "Console.h"
#include "modules/Graphic.h" #include "modules/Graphic.h"
#include <mutex> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <mutex>
#include <thread>
#include "RemoteClient.h" #include "RemoteClient.h"
@ -61,6 +63,11 @@ namespace DFHack
class PluginManager; class PluginManager;
class Core; class Core;
class ServerMain; class ServerMain;
class CoreSuspender;
namespace Lua { namespace Core {
DFHACK_EXPORT void Reset(color_ostream &out, const char *where);
} }
namespace Windows namespace Windows
{ {
class df_window; class df_window;
@ -129,10 +136,6 @@ namespace DFHack
} }
/// check if the activity lock is owned by this thread /// check if the activity lock is owned by this thread
bool isSuspended(void); bool isSuspended(void);
/// try to acquire the activity lock
void Suspend(void);
/// return activity lock
void Resume(void);
/// Is everything OK? /// Is everything OK?
bool isValid(void) { return !errorstate; } bool isValid(void) { return !errorstate; }
@ -203,10 +206,6 @@ namespace DFHack
struct Private; struct Private;
Private *d; Private *d;
friend class CoreSuspendClaimer;
int ClaimSuspend(bool force_base);
void DisclaimSuspend(int level);
bool Init(); bool Init();
int Update (void); int Update (void);
int TileUpdate (void); int TileUpdate (void);
@ -285,32 +284,86 @@ namespace DFHack
std::mutex misc_data_mutex; std::mutex misc_data_mutex;
std::map<std::string,void*> misc_data_map; std::map<std::string,void*> misc_data_map;
/*!
* \defgroup core_suspend CoreSuspender state handling serialization to
* DF memory.
* \sa DFHack::CoreSuspender
* \{
*/
std::recursive_mutex CoreSuspendMutex;
std::condition_variable_any CoreWakeup;
std::atomic<std::thread::id> ownerThread;
std::atomic<size_t> toolCount;
//! \}
friend class CoreService; friend class CoreService;
friend class ServerConnection; friend class ServerConnection;
friend class CoreSuspender;
ServerMain *server; ServerMain *server;
}; };
class CoreSuspender { template<typename Derived>
Core *core; struct ToolIncrement {
public: ToolIncrement(std::atomic<size_t>& toolCount) {
CoreSuspender() : core(&Core::getInstance()) { core->Suspend(); } toolCount += 1;
CoreSuspender(Core *core) : core(core) { core->Suspend(); } }
~CoreSuspender() { core->Resume(); }
}; };
/** Claims the current thread already has the suspend lock. /*!
* Strictly for use in callbacks from DF. * CoreSuspender allows serialization to DF data with std::unique_lock like
* interface. It includes handling for recursive CoreSuspender calls and
* notification to main thread after all queue tools have been handled.
*
* State transitions are:
* - Startup setups Core::SuspendMutex to unlocked states
* - Core::Init locks Core::SuspendMutex until the thread exits or that thread
* calls Core::Shutdown or Core::~Core.
* - Other thread request core suspend by atomic incrementation of Core::toolCount
* and then locking Core::CoreSuspendMutex. After locking CoreSuspendMutex
* success callers exchange their std::thread::id to Core::ownerThread.
* - Core::Update() makes sure that queued tools are run when it calls
* Core::CoreWakup::wait. The wait keeps Core::CoreSuspendMutex unlocked
* and waits until Core::toolCount is reduced back to zero.
* - CoreSuspender::~CoreSuspender() first stores the previous Core::ownerThread
* back. In case of recursive call Core::ownerThread equals tid. If tis is
* zero then we are releasing the recursive_mutex which means suspend
* context is over. It is time to reset lua.
* The last step is to decrement Core::toolCount and wakeup main thread if
* no more tools are queued trying to acquire the
* Core::CoreSuspenderMutex.
*/ */
class CoreSuspendClaimer { class CoreSuspender : protected ToolIncrement<CoreSuspender>,
public std::unique_lock<std::recursive_mutex> {
using parent_t = std::unique_lock<std::recursive_mutex>;
Core *core; Core *core;
int level; std::thread::id tid;
public: public:
CoreSuspendClaimer(bool base = false) : core(&Core::getInstance()) { CoreSuspender() : CoreSuspender(&Core::getInstance()) { }
level = core->ClaimSuspend(base); CoreSuspender(bool) : CoreSuspender(&Core::getInstance()) { }
} CoreSuspender(Core* core, bool) : CoreSuspender(core) { }
CoreSuspendClaimer(Core *core, bool base = false) : core(core) { CoreSuspender(Core* core) :
level = core->ClaimSuspend(base); /* Increment the wait count */
ToolIncrement{core->toolCount},
/* Lock the core */
parent_t{core->CoreSuspendMutex},
core{core},
/* Mark this thread to be the core owner */
tid{core->ownerThread.exchange(std::this_thread::get_id())}
{ }
~CoreSuspender() {
/* Restore core owner to previous value */
core->ownerThread.store(tid);
if (tid == std::thread::id{})
Lua::Core::Reset(core->getConsole(), "suspend");
/* Notify core to continue when all queued tools have completed
* 0 = None wants to own the core
* 1+ = There are tools waiting core access
* fetch_add returns old value before subtraction
*/
if (core->toolCount.fetch_add(-1) == 1)
core->CoreWakeup.notify_one();
} }
~CoreSuspendClaimer() { core->DisclaimSuspend(level); }
}; };
using CoreSuspendClaimer = CoreSuspender;
} }

@ -395,7 +395,7 @@ namespace DFHack {namespace Lua {
// Not exported; for use by the Core class // Not exported; for use by the Core class
bool Init(color_ostream &out); bool Init(color_ostream &out);
void Reset(color_ostream &out, const char *where); DFHACK_EXPORT void Reset(color_ostream &out, const char *where);
// Events signalled by the core // Events signalled by the core
void onStateChange(color_ostream &out, int code); void onStateChange(color_ostream &out, int code);

@ -133,6 +133,7 @@ namespace DFHack
class CoreService : public RPCService { class CoreService : public RPCService {
int suspend_depth; int suspend_depth;
CoreSuspender* coreSuspender;
static int doRunLuaFunction(lua_State *L); static int doRunLuaFunction(lua_State *L);
public: public: