Petr Mrázek 2012-03-15 15:40:35 +01:00
commit e5efbc5895
6 changed files with 118 additions and 32 deletions

@ -104,6 +104,20 @@ struct Core::Cond
bool predicate;
};
struct Core::Private
{
tthread::mutex AccessMutex;
tthread::mutex StackMutex;
std::stack<Core::Cond*> suspended_tools;
Core::Cond core_cond;
thread::id df_suspend_thread;
int df_suspend_depth;
Private() {
df_suspend_depth = 0;
}
};
void cheap_tokenise(string const& input, vector<string> &output)
{
string *cur = NULL;
@ -576,6 +590,8 @@ void fIOthread(void * iodata)
Core::Core()
{
d = new Private();
// init the console. This must be always the first step!
plug_mgr = 0;
vif = 0;
@ -585,10 +601,6 @@ Core::Core()
started = false;
memset(&(s_mods), 0, sizeof(s_mods));
// create mutex for syncing with interactive tasks
AccessMutex = 0;
StackMutex = 0;
core_cond = 0;
// set up hotkey capture
hotkey_set = false;
HotkeyMutex = 0;
@ -687,10 +699,7 @@ bool Core::Init()
df::global::InitGlobals();
// create mutex for syncing with interactive tasks
StackMutex = new mutex();
AccessMutex = new mutex();
misc_data_mutex=new mutex();
core_cond = new Core::Cond();
cerr << "Initializing Plugins.\n";
// create plugin manager
plug_mgr = new PluginManager(this);
@ -783,22 +792,49 @@ void *Core::GetData( std::string key )
void Core::Suspend()
{
Core::Cond * nc = new Core::Cond();
auto tid = this_thread::get_id();
// If recursive, just increment the count
{
lock_guard<mutex> lock(d->AccessMutex);
if (d->df_suspend_depth > 0 && d->df_suspend_thread == tid)
{
d->df_suspend_depth++;
return;
}
}
// put the condition on a stack
StackMutex->lock();
suspended_tools.push(nc);
StackMutex->unlock();
Core::Cond *nc = new Core::Cond();
{
lock_guard<mutex> lock2(d->StackMutex);
d->suspended_tools.push(nc);
}
// wait until Core::Update() wakes up the tool
AccessMutex->lock();
nc->Lock(AccessMutex);
AccessMutex->unlock();
{
lock_guard<mutex> lock(d->AccessMutex);
nc->Lock(&d->AccessMutex);
assert(d->df_suspend_depth == 0);
d->df_suspend_thread = tid;
d->df_suspend_depth = 1;
}
}
void Core::Resume()
{
AccessMutex->lock();
core_cond->Unlock();
AccessMutex->unlock();
auto tid = this_thread::get_id();
lock_guard<mutex> lock(d->AccessMutex);
assert(d->df_suspend_depth > 0 && d->df_suspend_thread == tid);
if (--d->df_suspend_depth == 0)
d->core_cond.Unlock();
}
int Core::TileUpdate()
@ -853,21 +889,24 @@ int Core::Update()
// wake waiting tools
// do not allow more tools to join in while we process stuff here
StackMutex->lock();
while (!suspended_tools.empty())
lock_guard<mutex> lock_stack(d->StackMutex);
while (!d->suspended_tools.empty())
{
Core::Cond * nc = suspended_tools.top();
suspended_tools.pop();
AccessMutex->lock();
// wake tool
nc->Unlock();
// wait for tool to wake us
core_cond->Lock(AccessMutex);
AccessMutex->unlock();
Core::Cond * nc = d->suspended_tools.top();
d->suspended_tools.pop();
lock_guard<mutex> lock(d->AccessMutex);
// wake tool
nc->Unlock();
// wait for tool to wake us
d->core_cond.Lock(&d->AccessMutex);
// verify
assert(d->df_suspend_depth == 0);
// destroy condition
delete nc;
}
StackMutex->unlock();
return 0;
};

@ -64,11 +64,21 @@ using dfproto::CoreTextFragment;
using google::protobuf::MessageLite;
CoreService::CoreService() {
suspend_depth = 0;
// These 2 methods must be first, so that they get id 0 and 1
addMethod("BindMethod", &CoreService::BindMethod);
addMethod("RunCommand", &CoreService::RunCommand);
// Add others here:
addMethod("CoreSuspend", &CoreService::CoreSuspend);
addMethod("CoreResume", &CoreService::CoreResume);
}
CoreService::~CoreService()
{
while (suspend_depth-- > 0)
Core::getInstance().Resume();
}
command_result CoreService::BindMethod(color_ostream &stream,
@ -107,6 +117,23 @@ command_result CoreService::RunCommand(color_ostream &stream,
return Core::getInstance().plug_mgr->InvokeCommand(stream, cmd, args);
}
command_result CoreService::CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt)
{
Core::getInstance().Suspend();
cnt->set_value(++suspend_depth);
return CR_OK;
}
command_result CoreService::CoreResume(color_ostream &stream, const EmptyMessage*, IntMessage *cnt)
{
if (suspend_depth <= 0)
return CR_WRONG_USAGE;
Core::getInstance().Resume();
cnt->set_value(--suspend_depth);
return CR_OK;
}
RPCService::RPCService()
{
owner = NULL;

@ -141,24 +141,28 @@ namespace DFHack
DFHack::Console con;
Core();
class Private;
Private *d;
bool Init();
int Update (void);
int TileUpdate (void);
int Shutdown (void);
int SDL_Event(SDL::Event* event);
bool ncurses_wgetch(int in, int & out);
Core(Core const&); // Don't Implement
void operator=(Core const&); // Don't implement
// report error to user while failing
void fatal (std::string output, bool will_deactivate);
// 1 = fatal failure
bool errorstate;
// regulate access to DF
struct Cond;
tthread::mutex * AccessMutex;
tthread::mutex * StackMutex;
std::stack < Core::Cond * > suspended_tools;
Core::Cond * core_cond;
// FIXME: shouldn't be kept around like this
DFHack::VersionInfoFactory * vif;
// Module storage

@ -34,6 +34,8 @@ distribution.
namespace DFHack
{
using dfproto::EmptyMessage;
using dfproto::IntMessage;
using dfproto::StringMessage;
enum command_result
{

@ -189,14 +189,20 @@ namespace DFHack
};
class CoreService : public RPCService {
int suspend_depth;
public:
CoreService();
~CoreService();
command_result BindMethod(color_ostream &stream,
const dfproto::CoreBindRequest *in,
dfproto::CoreBindReply *out);
command_result RunCommand(color_ostream &stream,
const dfproto::CoreRunCommandRequest *in);
// For batching
command_result CoreSuspend(color_ostream &stream, const EmptyMessage*, IntMessage *cnt);
command_result CoreResume(color_ostream &stream, const EmptyMessage*, IntMessage *cnt);
};
class DFHACK_EXPORT ServerConnection {

@ -45,6 +45,14 @@ message CoreErrorNotification {
message EmptyMessage {}
message IntMessage {
required int32 value = 1;
}
message StringMessage {
required string value = 1;
}
message CoreBindRequest {
required string method = 1;
required string input_msg = 2;