From 49f3de979f41a20ab1a84d04953f2042e0b1e677 Mon Sep 17 00:00:00 2001 From: Pauli Date: Sat, 7 Jul 2018 13:13:55 +0300 Subject: [PATCH] Make ServerMain and ServerConnection data race free RemoteServer and PluginManager side would need complete redesign to be data race free and concurrent. But as that would be unlikely to be required from DFHack I decided simpler solution that is fixing data ownership to a thread and all ServerConnection share a single lock which allows access to PluginManager and Core. --- library/Core.cpp | 9 +-- library/RemoteServer.cpp | 127 +++++++++++++++++++++------------ library/include/Core.h | 1 - library/include/RemoteServer.h | 21 +++--- 4 files changed, 98 insertions(+), 60 deletions(-) diff --git a/library/Core.cpp b/library/Core.cpp index 222680f62..15d4be72c 100644 --- a/library/Core.cpp +++ b/library/Core.cpp @@ -1549,7 +1549,6 @@ Core::Core() : last_pause_state = false; top_viewscreen = NULL; screen_window = NULL; - server = NULL; color_ostream::log_errors_to_stderr = true; @@ -1767,6 +1766,8 @@ bool Core::Init() // create plugin manager plug_mgr = new PluginManager(this); plug_mgr->init(); + cerr << "Starting the TCP listener.\n"; + auto listen = ServerMain::listen(RemoteClient::GetDefaultPort()); IODATA *temp = new IODATA; temp->core = this; temp->plug_mgr = plug_mgr; @@ -1791,9 +1792,7 @@ bool Core::Init() started = true; modstate = 0; - cerr << "Starting the TCP listener.\n"; - server = new ServerMain(); - if (!server->listen(RemoteClient::GetDefaultPort())) + if (!listen.get()) cerr << "TCP listen failed.\n"; if (df::global::ui_sidebar_menus) @@ -2296,6 +2295,8 @@ int Core::Shutdown ( void ) HotkeyCond.notify_one(); } + ServerMain::block(); + d->hotkeythread.join(); d->iothread.join(); diff --git a/library/RemoteServer.cpp b/library/RemoteServer.cpp index 86ef9f40c..523058d55 100644 --- a/library/RemoteServer.cpp +++ b/library/RemoteServer.cpp @@ -57,12 +57,11 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include #include "json/json.h" -#include "tinythread.h" using namespace DFHack; -using namespace tthread; using dfproto::CoreTextNotification; using dfproto::CoreTextFragment; @@ -72,6 +71,30 @@ bool readFullBuffer(CSimpleSocket *socket, void *buf, int size); bool sendRemoteMessage(CSimpleSocket *socket, int16_t id, const ::google::protobuf::MessageLite *msg, bool size_ready); +std::mutex ServerMain::access_{}; +bool ServerMain::blocked_{}; + +namespace { + struct BlockedException : std::exception { + const char* what() const noexcept override + { + return "Core has blocked all connection. This should have been catched."; + } + }; +} + +namespace DFHack { + + struct BlockGuard { + std::lock_guard lock; + BlockGuard() : + lock{ServerMain::access_} + { + if (ServerMain::blocked_) + throw BlockedException{}; + } + }; +} RPCService::RPCService() { @@ -134,9 +157,6 @@ ServerConnection::ServerConnection(CActiveSocket *socket) core_service = new CoreService(); core_service->finalize(this, &functions); - - thread = new tthread::thread(threadFn, (void*)this); - thread->detach(); } ServerConnection::~ServerConnection() @@ -144,7 +164,6 @@ ServerConnection::~ServerConnection() in_error = true; socket->Close(); delete socket; - delete thread; for (auto it = plugin_services.begin(); it != plugin_services.end(); ++it) delete it->second; @@ -216,13 +235,14 @@ void ServerConnection::connection_ostream::flush_proxy() } } -void ServerConnection::threadFn(void *arg) +void ServerConnection::Accepted(CActiveSocket* socket) { - ServerConnection *me = (ServerConnection*)arg; - - me->threadFn(); - - delete me; + std::thread{[](CActiveSocket* socket) { + try { + ServerConnection(socket).threadFn(); + } catch (BlockedException e) { + } + }, socket}.detach(); } void ServerConnection::threadFn() @@ -292,6 +312,7 @@ void ServerConnection::threadFn() // Find and call the function int in_size = header.size; + BlockGuard lock; ServerFunctionBase *fn = vector_get(functions, header.id); MessageLite *reply = NULL; @@ -378,25 +399,21 @@ void ServerConnection::threadFn() std::cerr << "Shutting down client connection." << endl; } -ServerMain::ServerMain() -{ - socket = new CPassiveSocket(); - thread = NULL; -} +namespace { + + struct ServerMainImpl : public ServerMain { + CPassiveSocket socket; + static void threadFn(std::promise promise, int port); + ServerMainImpl(std::promise promise, int port); + ~ServerMainImpl(); + }; -ServerMain::~ServerMain() -{ - socket->Close(); - delete socket; - delete thread; } -bool ServerMain::listen(int port) +ServerMainImpl::ServerMainImpl(std::promise promise, int port) : + socket{} { - if (thread) - return true; - - socket->Initialize(); + socket.Initialize(); std::string filename("dfhack-config/remote-server.json"); @@ -427,29 +444,49 @@ bool ServerMain::listen(int port) } std::cerr << "Listening on port " << port << (allow_remote ? " (remote enabled)" : "") << std::endl; - if (allow_remote) - { - if (!socket->Listen(NULL, port)) - return false; - } - else - { - if (!socket->Listen("127.0.0.1", port)) - return false; + const char* addr = allow_remote ? NULL : "127.0.0.1"; + if (!socket.Listen(addr, port)) { + promise.set_value(false); + return; } + promise.set_value(true); +} + +ServerMainImpl::~ServerMainImpl() +{ + socket.Close(); +} - thread = new tthread::thread(threadFn, this); - thread->detach(); - return true; +std::future ServerMain::listen(int port) +{ + std::promise promise; + auto rv = promise.get_future(); + std::thread{&ServerMainImpl::threadFn, std::move(promise), port}.detach(); + return rv; } -void ServerMain::threadFn(void *arg) +void ServerMainImpl::threadFn(std::promise promise, int port) { - ServerMain *me = (ServerMain*)arg; - CActiveSocket *client; + ServerMainImpl server{std::move(promise), port}; - while ((client = me->socket->Accept()) != NULL) - { - new ServerConnection(client); + CActiveSocket *client = nullptr; + + try { + while ((client = server.socket.Accept()) != NULL) + { + BlockGuard lock; + ServerConnection::Accepted(client); + client = nullptr; + } + } catch(BlockedException e) { + if (client) + client->Close(); + delete client; } } + +void ServerMain::block() +{ + std::lock_guard lock{access_}; + blocked_ = true; +} diff --git a/library/include/Core.h b/library/include/Core.h index 7c3bc5578..85bfdbd3e 100644 --- a/library/include/Core.h +++ b/library/include/Core.h @@ -308,7 +308,6 @@ namespace DFHack friend class CoreSuspenderBase; friend struct CoreSuspendClaimMain; friend struct CoreSuspendReleaseMain; - ServerMain *server; }; class CoreSuspenderBase : protected std::unique_lock { diff --git a/library/include/RemoteServer.h b/library/include/RemoteServer.h index 85ac463fb..a7e0ad8d5 100644 --- a/library/include/RemoteServer.h +++ b/library/include/RemoteServer.h @@ -28,6 +28,8 @@ distribution. #include "RemoteClient.h" #include "Core.h" +#include + class CPassiveSocket; class CActiveSocket; class CSimpleSocket; @@ -233,26 +235,25 @@ namespace DFHack CoreService *core_service; std::map plugin_services; - tthread::thread *thread; - static void threadFn(void *); void threadFn(); + ServerConnection(CActiveSocket* socket); + ~ServerConnection(); public: - ServerConnection(CActiveSocket *socket); - ~ServerConnection(); + + static void Accepted(CActiveSocket* socket); ServerFunctionBase *findFunction(color_ostream &out, const std::string &plugin, const std::string &name); }; class ServerMain { - CPassiveSocket *socket; + static std::mutex access_; + static bool blocked_; + friend struct BlockGuard; - tthread::thread *thread; - static void threadFn(void *); public: - ServerMain(); - ~ServerMain(); - bool listen(int port); + static std::future listen(int port); + static void block(); }; }