Make ServerMain and ServerConnection data race free

RemoteServer and PluginManager side would need complete redesign to be
data race free and concurrent. But as that would be unlikely to be
required from DFHack I decided simpler solution that is fixing data
ownership to a thread and all ServerConnection share a single lock which
allows access to PluginManager and Core.
develop
Pauli 2018-07-07 13:13:55 +03:00 committed by lethosor
parent 0605b9601c
commit 49f3de979f
4 changed files with 98 additions and 60 deletions

@ -1549,7 +1549,6 @@ Core::Core() :
last_pause_state = false; last_pause_state = false;
top_viewscreen = NULL; top_viewscreen = NULL;
screen_window = NULL; screen_window = NULL;
server = NULL;
color_ostream::log_errors_to_stderr = true; color_ostream::log_errors_to_stderr = true;
@ -1767,6 +1766,8 @@ bool Core::Init()
// create plugin manager // create plugin manager
plug_mgr = new PluginManager(this); plug_mgr = new PluginManager(this);
plug_mgr->init(); plug_mgr->init();
cerr << "Starting the TCP listener.\n";
auto listen = ServerMain::listen(RemoteClient::GetDefaultPort());
IODATA *temp = new IODATA; IODATA *temp = new IODATA;
temp->core = this; temp->core = this;
temp->plug_mgr = plug_mgr; temp->plug_mgr = plug_mgr;
@ -1791,9 +1792,7 @@ bool Core::Init()
started = true; started = true;
modstate = 0; modstate = 0;
cerr << "Starting the TCP listener.\n"; if (!listen.get())
server = new ServerMain();
if (!server->listen(RemoteClient::GetDefaultPort()))
cerr << "TCP listen failed.\n"; cerr << "TCP listen failed.\n";
if (df::global::ui_sidebar_menus) if (df::global::ui_sidebar_menus)
@ -2296,6 +2295,8 @@ int Core::Shutdown ( void )
HotkeyCond.notify_one(); HotkeyCond.notify_one();
} }
ServerMain::block();
d->hotkeythread.join(); d->hotkeythread.join();
d->iothread.join(); d->iothread.join();

@ -57,12 +57,11 @@ POSSIBILITY OF SUCH DAMAGE.
#include <sstream> #include <sstream>
#include <memory> #include <memory>
#include <thread>
#include "json/json.h" #include "json/json.h"
#include "tinythread.h"
using namespace DFHack; using namespace DFHack;
using namespace tthread;
using dfproto::CoreTextNotification; using dfproto::CoreTextNotification;
using dfproto::CoreTextFragment; using dfproto::CoreTextFragment;
@ -72,6 +71,30 @@ bool readFullBuffer(CSimpleSocket *socket, void *buf, int size);
bool sendRemoteMessage(CSimpleSocket *socket, int16_t id, bool sendRemoteMessage(CSimpleSocket *socket, int16_t id,
const ::google::protobuf::MessageLite *msg, bool size_ready); const ::google::protobuf::MessageLite *msg, bool size_ready);
std::mutex ServerMain::access_{};
bool ServerMain::blocked_{};
namespace {
struct BlockedException : std::exception {
const char* what() const noexcept override
{
return "Core has blocked all connection. This should have been catched.";
}
};
}
namespace DFHack {
struct BlockGuard {
std::lock_guard<std::mutex> lock;
BlockGuard() :
lock{ServerMain::access_}
{
if (ServerMain::blocked_)
throw BlockedException{};
}
};
}
RPCService::RPCService() RPCService::RPCService()
{ {
@ -134,9 +157,6 @@ ServerConnection::ServerConnection(CActiveSocket *socket)
core_service = new CoreService(); core_service = new CoreService();
core_service->finalize(this, &functions); core_service->finalize(this, &functions);
thread = new tthread::thread(threadFn, (void*)this);
thread->detach();
} }
ServerConnection::~ServerConnection() ServerConnection::~ServerConnection()
@ -144,7 +164,6 @@ ServerConnection::~ServerConnection()
in_error = true; in_error = true;
socket->Close(); socket->Close();
delete socket; delete socket;
delete thread;
for (auto it = plugin_services.begin(); it != plugin_services.end(); ++it) for (auto it = plugin_services.begin(); it != plugin_services.end(); ++it)
delete it->second; delete it->second;
@ -216,13 +235,14 @@ void ServerConnection::connection_ostream::flush_proxy()
} }
} }
void ServerConnection::threadFn(void *arg) void ServerConnection::Accepted(CActiveSocket* socket)
{ {
ServerConnection *me = (ServerConnection*)arg; std::thread{[](CActiveSocket* socket) {
try {
me->threadFn(); ServerConnection(socket).threadFn();
} catch (BlockedException e) {
delete me; }
}, socket}.detach();
} }
void ServerConnection::threadFn() void ServerConnection::threadFn()
@ -292,6 +312,7 @@ void ServerConnection::threadFn()
// Find and call the function // Find and call the function
int in_size = header.size; int in_size = header.size;
BlockGuard lock;
ServerFunctionBase *fn = vector_get(functions, header.id); ServerFunctionBase *fn = vector_get(functions, header.id);
MessageLite *reply = NULL; MessageLite *reply = NULL;
@ -378,25 +399,21 @@ void ServerConnection::threadFn()
std::cerr << "Shutting down client connection." << endl; std::cerr << "Shutting down client connection." << endl;
} }
ServerMain::ServerMain() namespace {
{
socket = new CPassiveSocket(); struct ServerMainImpl : public ServerMain {
thread = NULL; CPassiveSocket socket;
} static void threadFn(std::promise<bool> promise, int port);
ServerMainImpl(std::promise<bool> promise, int port);
~ServerMainImpl();
};
ServerMain::~ServerMain()
{
socket->Close();
delete socket;
delete thread;
} }
bool ServerMain::listen(int port) ServerMainImpl::ServerMainImpl(std::promise<bool> promise, int port) :
socket{}
{ {
if (thread) socket.Initialize();
return true;
socket->Initialize();
std::string filename("dfhack-config/remote-server.json"); std::string filename("dfhack-config/remote-server.json");
@ -427,29 +444,49 @@ bool ServerMain::listen(int port)
} }
std::cerr << "Listening on port " << port << (allow_remote ? " (remote enabled)" : "") << std::endl; std::cerr << "Listening on port " << port << (allow_remote ? " (remote enabled)" : "") << std::endl;
if (allow_remote) const char* addr = allow_remote ? NULL : "127.0.0.1";
{ if (!socket.Listen(addr, port)) {
if (!socket->Listen(NULL, port)) promise.set_value(false);
return false; return;
}
else
{
if (!socket->Listen("127.0.0.1", port))
return false;
} }
promise.set_value(true);
}
ServerMainImpl::~ServerMainImpl()
{
socket.Close();
}
thread = new tthread::thread(threadFn, this); std::future<bool> ServerMain::listen(int port)
thread->detach(); {
return true; std::promise<bool> promise;
auto rv = promise.get_future();
std::thread{&ServerMainImpl::threadFn, std::move(promise), port}.detach();
return rv;
} }
void ServerMain::threadFn(void *arg) void ServerMainImpl::threadFn(std::promise<bool> promise, int port)
{ {
ServerMain *me = (ServerMain*)arg; ServerMainImpl server{std::move(promise), port};
CActiveSocket *client;
while ((client = me->socket->Accept()) != NULL) CActiveSocket *client = nullptr;
{
new ServerConnection(client); try {
while ((client = server.socket.Accept()) != NULL)
{
BlockGuard lock;
ServerConnection::Accepted(client);
client = nullptr;
}
} catch(BlockedException e) {
if (client)
client->Close();
delete client;
} }
} }
void ServerMain::block()
{
std::lock_guard<std::mutex> lock{access_};
blocked_ = true;
}

@ -308,7 +308,6 @@ namespace DFHack
friend class CoreSuspenderBase; friend class CoreSuspenderBase;
friend struct CoreSuspendClaimMain; friend struct CoreSuspendClaimMain;
friend struct CoreSuspendReleaseMain; friend struct CoreSuspendReleaseMain;
ServerMain *server;
}; };
class CoreSuspenderBase : protected std::unique_lock<std::recursive_mutex> { class CoreSuspenderBase : protected std::unique_lock<std::recursive_mutex> {

@ -28,6 +28,8 @@ distribution.
#include "RemoteClient.h" #include "RemoteClient.h"
#include "Core.h" #include "Core.h"
#include <future>
class CPassiveSocket; class CPassiveSocket;
class CActiveSocket; class CActiveSocket;
class CSimpleSocket; class CSimpleSocket;
@ -233,26 +235,25 @@ namespace DFHack
CoreService *core_service; CoreService *core_service;
std::map<std::string, RPCService*> plugin_services; std::map<std::string, RPCService*> plugin_services;
tthread::thread *thread;
static void threadFn(void *);
void threadFn(); void threadFn();
ServerConnection(CActiveSocket* socket);
~ServerConnection();
public: public:
ServerConnection(CActiveSocket *socket);
~ServerConnection(); static void Accepted(CActiveSocket* socket);
ServerFunctionBase *findFunction(color_ostream &out, const std::string &plugin, const std::string &name); ServerFunctionBase *findFunction(color_ostream &out, const std::string &plugin, const std::string &name);
}; };
class ServerMain { class ServerMain {
CPassiveSocket *socket; static std::mutex access_;
static bool blocked_;
friend struct BlockGuard;
tthread::thread *thread;
static void threadFn(void *);
public: public:
ServerMain();
~ServerMain();
bool listen(int port); static std::future<bool> listen(int port);
static void block();
}; };
} }