Add a test for signal_shared_tag implementation

The test cases check that the signal_shared_tag implementation can be
used and destructed safely from multiple threads.
develop
Pauli 2018-07-02 18:48:33 +03:00
parent 9cfb07f476
commit 490a855776
3 changed files with 181 additions and 1 deletions

@ -1,5 +1,7 @@
INCLUDE(Plugins.cmake)
find_package(Threads)
OPTION(BUILD_STONESENSE "Build stonesense (needs a checkout first)." OFF)
if(BUILD_STONESENSE)
add_subdirectory (stonesense)

@ -9,7 +9,7 @@ DFHACK_PLUGIN(counters counters.cpp)
DFHACK_PLUGIN(dumpmats dumpmats.cpp)
DFHACK_PLUGIN(eventExample eventExample.cpp)
DFHACK_PLUGIN(frozen frozen.cpp)
DFHACK_PLUGIN(kittens kittens.cpp)
DFHACK_PLUGIN(kittens kittens.cpp LINK_LIBRARIES ${CMAKE_THREAD_LIBS_INIT})
DFHACK_PLUGIN(memview memview.cpp memutils.cpp LINK_LIBRARIES lua)
DFHACK_PLUGIN(nestboxes nestboxes.cpp)
DFHACK_PLUGIN(notes notes.cpp)

@ -1,12 +1,16 @@
#include <atomic>
#include <vector>
#include <random>
#include <string>
#include <thread>
#include "Console.h"
#include "Core.h"
#include "Debug.h"
#include "Export.h"
#include "MiscUtils.h"
#include "PluginManager.h"
#include "Signal.hpp"
#include "modules/Gui.h"
#include "modules/Items.h"
@ -25,6 +29,10 @@ DFHACK_PLUGIN_IS_ENABLED(is_enabled);
REQUIRE_GLOBAL(ui);
REQUIRE_GLOBAL(world);
namespace DFHack {
DBG_DECLARE(kittens,command);
}
std::atomic<bool> shutdown_flag{false};
std::atomic<bool> final_flag{true};
std::atomic<bool> timering{false};
@ -42,6 +50,7 @@ command_result trackmenu (color_ostream &out, vector <string> & parameters);
command_result trackpos (color_ostream &out, vector <string> & parameters);
command_result trackstate (color_ostream &out, vector <string> & parameters);
command_result colormods (color_ostream &out, vector <string> & parameters);
command_result sharedsignal (color_ostream &out, vector <string> & parameters);
DFhackCExport command_result plugin_init ( color_ostream &out, std::vector <PluginCommand> &commands)
{
@ -51,6 +60,7 @@ DFhackCExport command_result plugin_init ( color_ostream &out, std::vector <Plug
commands.push_back(PluginCommand("trackpos","Track mouse and designation coords (toggle).",trackpos));
commands.push_back(PluginCommand("trackstate","Track world and map state (toggle).",trackstate));
commands.push_back(PluginCommand("colormods","Dump colormod vectors.",colormods));
commands.push_back(PluginCommand("sharedsignal","Test Signal with signal_shared_tag",sharedsignal));
return CR_OK;
}
@ -195,6 +205,174 @@ command_result ktimer (color_ostream &out, vector <string> & parameters)
return CR_OK;
}
struct Connected;
using shared = std::shared_ptr<Connected>;
using weak = std::weak_ptr<Connected>;
static constexpr std::chrono::microseconds delay{1};
template<typename Derived>
struct ClearMem : public ConnectedBase {
~ClearMem()
{
memset(reinterpret_cast<void*>(this), 0xDE, sizeof(Derived));
}
};
struct Connected : public ClearMem<Connected> {
using Sig = Signal<void(int), signal_shared_tag>;
std::array<Sig::Connection,4> con;
Sig signal;
weak other;
Sig::weak_ptr other_sig;
color_ostream *out;
int id;
uint32_t count;
uint32_t caller;
alignas(64) std::atomic<uint32_t> callee;
Connected() = default;
Connected(int id) :
Connected{}
{
this->id = id;
}
void connect(color_ostream& o, shared& b, size_t pos, uint32_t c)
{
out = &o;
count = c*2;
other = b;
other_sig = b->signal.weak_from_this();
// Externally synchronized object destruction is only safe to this
// connect.
con[pos] = b->signal.connect(
[this](int) {
uint32_t old = callee.fetch_add(1);
assert(old != 0xDEDEDEDE);
std::this_thread::sleep_for(delay);
assert(callee != 0xDEDEDEDE);
});
// Shared object managed object with possibility of destruction while
// other threads calling emit must pass the shared_ptr to connect.
Connected *bptr = b.get();
b->con[pos] = signal.connect(b,
[bptr](int) {
uint32_t old = bptr->callee.fetch_add(1);
assert(old != 0xDEDEDEDE);
std::this_thread::sleep_for(delay);
assert(bptr->callee != 0xDEDEDEDE);
});
}
void reconnect(size_t pos) {
auto b = other.lock();
if (!b)
return;
// Not required to use Sig::lock because other holds strong reference to
// Signal. But this just shows how weak_ref could be used.
auto sig = Sig::lock(other_sig);
if (!sig)
return;
con[pos] = sig->connect(b,
[this](int) {
uint32_t old = callee.fetch_add(1);
assert(old != 0xDEDEDEDE);
std::this_thread::sleep_for(delay);
assert(callee != 0xDEDEDEDE);
});
}
void connect(color_ostream& o, shared& a, shared& b,size_t pos, uint32_t c)
{
out = &o;
count = c;
con[pos] = b->signal.connect(a,
[this](int) {
uint32_t old = callee.fetch_add(1);
assert(old != 0xDEDEDEDE);
std::this_thread::sleep_for(delay);
assert(callee != 0xDEDEDEDE);
});
}
Connected* operator->() noexcept
{
return this;
}
~Connected() {
INFO(command,*out).print("Connected %d had %d count. "
"It was caller %d times. "
"It was callee %d times.\n",
id, count, caller, callee.load());
}
};
command_result sharedsignal (color_ostream &out, vector <string> & parameters)
{
using rng_t = std::linear_congruential_engine<uint32_t, 747796405U, 2891336453U, 0>;
rng_t rng(std::random_device{}());
size_t count = 10;
if (0 < parameters.size()) {
std::stringstream ss(parameters[0]);
ss >> count;
DEBUG(command, out) << "Parsed " << count
<< " from paramters[0] '" << parameters[0] << '\'' << std::endl;
}
std::uniform_int_distribution<uint32_t> dis(4096,8192);
out << "Running signal_shared_tag destruction test "
<< count << " times" << std::endl;
for (size_t nr = 0; nr < count; ++nr) {
std::array<std::thread,4> t{};
// Make an object which destruction is protected by std::thread::join()
Connected external{static_cast<int>(t.size())};
TRACE(command, out) << "begin " << std::endl;
{
int id = 0;
// Make objects that are automatically protected using weak_ptr
// references that are promoted to shared_ptr when Signal is
// accessed.
std::array<shared,4> c = {
std::make_shared<Connected>(id++),
std::make_shared<Connected>(id++),
std::make_shared<Connected>(id++),
std::make_shared<Connected>(id++),
};
assert(t.size() == c.size());
for (unsigned i = 1; i < c.size(); ++i) {
c[0]->connect(out, c[0], c[i], i - 1, dis(rng));
c[i]->connect(out, c[i], c[0], 0, dis(rng));
}
external.connect(out, c[1], 1, dis(rng));
auto thr = [&out](shared c) {
TRACE(command, out) << "Thread " << c->id << " started." << std::endl;
weak ref = c;
for (;c->caller < c->count; ++c->caller) {
c->signal(c->caller);
}
TRACE(command, out) << "Thread " << c->id << " resets shared." << std::endl;
c.reset();
while((c = ref.lock())) {
++c->caller;
c->signal(c->caller);
c.reset();
std::this_thread::sleep_for(delay*25);
}
};
for (unsigned i = 0; i < c.size(); ++i) {
TRACE(command, out) << "start thread " << i << std::endl;
t[i] = std::thread{thr, c[i]};
}
}
TRACE(command, out) << "running " << std::endl;
for (;external->caller < external->count; ++external->caller) {
external->signal(external->caller);
external->reconnect(1);
}
TRACE(command, out) << "join " << std::endl;
for (unsigned i = 0; i < t.size(); ++i)
t[i].join();
}
return CR_OK;
}
command_result kittens (color_ostream &out, vector <string> & parameters)
{
if (parameters.size() >= 1)