Use sched_yield depending on CPU affinity

develop
Petr Mrázek 2010-03-04 04:40:06 +01:00
parent 0192520d43
commit 6e69dcdeed
5 changed files with 188 additions and 178 deletions

@ -28,6 +28,7 @@ distribution.
#include <sys/ipc.h>
#include <time.h>
#include "../shmserver/shms.h"
#include "../shmserver/shms-core.h"
#include <sys/time.h>
#include <time.h>
#include <sched.h>
@ -68,22 +69,27 @@ class Process::Private
bool waitWhile (CORE_COMMAND state);
bool DF_TestBridgeVersion(bool & ret);
bool DF_GetPID(pid_t & ret);
void DF_SyncAffinity(void);
};
// some helpful macros to keep the code bloat in check
#define SHMCMD ((shm_cmd *)my_shm)->pingpong
#define D_SHMCMD ((shm_cmd *)d->my_shm)->pingpong
#define SHMHDR ((shm_header *)my_shm)
#define D_SHMHDR ((shm_header *)d->my_shm)
#define SHMHDR ((shm_core_hdr *)my_shm)
#define D_SHMHDR ((shm_core_hdr *)d->my_shm)
/*
Yeah. with no way to synchronize things (locks are slow, the OS doesn't give us enough control over scheduling)
we end up with this silly thing
*/
bool Process::Private::waitWhile (CORE_COMMAND state)
{
uint32_t cnt = 0;
struct shmid_ds descriptor;
while (SHMCMD == state)
{
if(cnt == 10000)
if(cnt == 10000)// check if the other process is still there
{
shmctl(my_shmid, IPC_STAT, &descriptor);
@ -98,7 +104,10 @@ bool Process::Private::waitWhile (CORE_COMMAND state)
cnt = 0;
}
}
SCHED_YIELD
if(useYield)
{
SCHED_YIELD
}
cnt++;
}
if(SHMCMD == CORE_SV_ERROR)
@ -136,6 +145,31 @@ bool Process::Private::DF_GetPID(pid_t & ret)
return true;
}
uint32_t OS_getAffinity()
{
cpu_set_t mask;
sched_getaffinity(0,sizeof(cpu_set_t),&mask);
// FIXME: truncation
uint32_t affinity = *(uint32_t *) &mask;
return affinity;
}
void Process::Private::DF_SyncAffinity( void )
{
SHMHDR->value = OS_getAffinity();
gcc_barrier
SHMCMD = CORE_SYNC_YIELD;
gcc_barrier
if(!waitWhile(CORE_SYNC_YIELD))
return;
gcc_barrier
SHMCMD = CORE_SUSPENDED;
useYield = SHMHDR->value;
#ifdef DEBUG
if(useYield) cerr << "Using Yield!" << endl;
#endif
}
Process::Process(vector <memory_info *> & known_versions)
: d(new Private())
{
@ -204,6 +238,7 @@ Process::Process(vector <memory_info *> & known_versions)
// try to identify the DF version
d->validate(target_name, d->my_pid, known_versions);
d->DF_SyncAffinity();
d->my_window = new DFWindow(this);
}
gcc_barrier

@ -23,6 +23,7 @@ distribution.
*/
#include "DFCommonInternal.h"
#include "../shmserver/shms.h"
#include "../shmserver/shms-core.h"
using namespace DFHack;
// a full memory barrier! better be safe than sorry.
@ -38,6 +39,7 @@ class Process::Private
attached = false;
suspended = false;
identified = false;
useYield = 0;
DFSVMutex = 0;
DFCLMutex = 0;
};
@ -52,13 +54,22 @@ class Process::Private
bool attached;
bool suspended;
bool identified;
bool useYield;
bool waitWhile (CORE_COMMAND state);
bool isValidSV();
bool DF_TestBridgeVersion(bool & ret);
bool DF_GetPID(uint32_t & ret);
void DF_SyncAffinity(void);
};
// some helpful macros to keep the code bloat in check
#define SHMCMD ((shm_cmd *)my_shm)->pingpong
#define D_SHMCMD ((shm_cmd *)d->my_shm)->pingpong
#define SHMHDR ((shm_core_hdr *)my_shm)
#define D_SHMHDR ((shm_core_hdr *)d->my_shm)
// is the other side still there?
bool Process::Private::isValidSV()
{
@ -90,14 +101,17 @@ bool Process::Private::isValidSV()
bool Process::Private::waitWhile (CORE_COMMAND state)
{
uint32_t cnt = 0;
SCHED_YIELD // yield the CPU, valid only on single-core CPUs
while (((shm_cmd *)my_shm)->pingpong == state)
{
// yield the CPU, only on single-core CPUs
if(useYield)
{
SCHED_YIELD
}
if(cnt == 10000)
{
if(!isValidSV())// DF not there anymore?
{
full_barrier
((shm_cmd *)my_shm)->pingpong = CORE_RUNNING;
attached = suspended = false;
ReleaseMutex(DFCLMutex);
@ -115,7 +129,6 @@ bool Process::Private::waitWhile (CORE_COMMAND state)
((shm_cmd *)my_shm)->pingpong = CORE_RUNNING;
attached = suspended = false;
cerr << "shm server error!" << endl;
assert (false);
return false;
}
return true;
@ -145,6 +158,34 @@ bool Process::Private::DF_GetPID(uint32_t & ret)
return true;
}
void Process::Private::DF_SyncAffinity(void)
{
}
uint32_t OS_getAffinity()
{
HANDLE hProcess = GetCurrentProcess();
DWORD dwProcessAffinityMask, dwSystemAffinityMask;
GetProcessAffinityMask( hProcess, &dwProcessAffinityMask, &dwSystemAffinityMask );
return dwProcessAffinityMask;
}
void Process::Private::DF_SyncAffinity( void )
{
SHMHDR->value = OS_getAffinity();
full_barrier
SHMCMD = CORE_SYNC_YIELD;
full_barrier
if(!waitWhile(CORE_SYNC_YIELD))
return;
full_barrier
SHMCMD = CORE_SUSPENDED;
useYield = SHMHDR->value;
if(useYield) cerr << "Using Yield!" << endl;
}
Process::Process(vector <memory_info *> & known_versions)
: d(new Private())
{
@ -255,12 +296,13 @@ Process::Process(vector <memory_info *> & known_versions)
if(d->identified)
{
d->my_window = new DFWindow(this);
d->DF_SyncAffinity();
}
else
{
((shm_cmd *)d->my_shm)->pingpong = CORE_RUNNING;
UnmapViewOfFile(d->my_shm);
d->my_shm = 0;
d->my_shm = 0;
ReleaseMutex(d->DFCLMutex);
CloseHandle(d->DFSVMutex);
d->DFSVMutex = 0;

@ -33,50 +33,7 @@ distribution.
#include <string>
#include <vector>
#include "shms.h"
enum DFPP_CmdType
{
CANCELLATION, // we should jump out of the Act()
CLIENT_WAIT, // we are waiting for the client
FUNCTION, // we call a function as a result of the command
};
struct DFPP_command
{
DFPP_CmdType type:32; // force the enum to 32 bits for compatibility reasons
std::string name;
void (*_function)(void);
};
struct DFPP_module
{
inline void push_command(DFPP_CmdType type, const char * name, void (*_function)(void))
{
DFPP_command cmd;
cmd.type = type;
cmd.name = name;
cmd._function = _function;
commands.push_back(cmd);
}
inline void set_command(unsigned int index, DFPP_CmdType type, const char * name, void (*_function)(void))
{
DFPP_command cmd;
cmd.type = type;
cmd.name = name;
cmd._function = _function;
commands[index] = cmd;
}
inline void reserve (unsigned int numcommands)
{
commands.clear();
DFPP_command cmd = {CANCELLATION,"",0};
commands.resize(numcommands,cmd);
}
std::string name;
uint32_t version; // version
std::vector <DFPP_command> commands;
void * modulestate;
};
#include "shms-core.h"
std::vector <DFPP_module> module_registry;
@ -84,78 +41,59 @@ std::vector <DFPP_module> module_registry;
extern int errorstate;
extern char *shm;
extern int shmid;
bool useYield = 0;
#define SHMHDR ((shm_header *)shm)
#define SHMHDR ((shm_core_hdr *)shm)
#define SHMCMD ((shm_cmd *)shm)->pingpong
void GetCoreVersion (void)
{
SHMHDR->value = module_registry[0].version;
full_barrier
SHMCMD = CORE_RET_VERSION;
}
void GetPID (void)
{
SHMHDR->value = OS_getPID();
full_barrier
SHMCMD = CORE_RET_PID;
}
void ReadRaw (void)
{
memcpy(shm + SHM_HEADER, (void *) SHMHDR->address,SHMHDR->length);
full_barrier
SHMCMD = CORE_RET_DATA;
}
void ReadDWord (void)
{
SHMHDR->value = *((uint32_t*) SHMHDR->address);
full_barrier
SHMCMD = CORE_RET_DWORD;
}
void ReadWord (void)
{
SHMHDR->value = *((uint16_t*) SHMHDR->address);
full_barrier
SHMCMD = CORE_RET_WORD;
}
void ReadByte (void)
{
SHMHDR->value = *((uint8_t*) SHMHDR->address);
full_barrier
SHMCMD = CORE_RET_BYTE;
}
void WriteRaw (void)
{
memcpy((void *)SHMHDR->address, shm + SHM_HEADER,SHMHDR->length);
full_barrier
SHMCMD = CORE_SUSPENDED;
}
void WriteDWord (void)
{
(*(uint32_t*)SHMHDR->address) = SHMHDR->value;
full_barrier
SHMCMD = CORE_SUSPENDED;
}
void WriteWord (void)
{
(*(uint16_t*)SHMHDR->address) = SHMHDR->value;
full_barrier
SHMCMD = CORE_SUSPENDED;
}
void WriteByte (void)
{
(*(uint8_t*)SHMHDR->address) = SHMHDR->value;
full_barrier
SHMCMD = CORE_SUSPENDED;
}
void ReadSTLString (void)
@ -163,10 +101,8 @@ void ReadSTLString (void)
std::string * myStringPtr = (std::string *) SHMHDR->address;
unsigned int l = myStringPtr->length();
SHMHDR->value = l;
// there doesn't have to be a null terminator!
// FIXME: there doesn't have to be a null terminator!
strncpy(shm+SHM_HEADER,myStringPtr->c_str(),l+1);
full_barrier
SHMCMD = CORE_RET_STRING;
}
void WriteSTLString (void)
@ -174,13 +110,31 @@ void WriteSTLString (void)
std::string * myStringPtr = (std::string *) SHMHDR->address;
// here we DO expect a 0 terminator
myStringPtr->assign((const char *) (shm + SHM_HEADER));
full_barrier
SHMCMD = CORE_SUSPENDED;
}
void Suspend (void)
// MIT HAKMEM bitcount
int bitcount(uint32_t n)
{
SHMCMD = CORE_SUSPENDED;
register uint32_t tmp;
tmp = n - ((n >> 1) & 033333333333) - ((n >> 2) & 011111111111);
return ((tmp + (tmp >> 3)) & 030707070707) % 63;
}
// get local and remote affinity, set up yield if required (single core available)
void SyncYield (void)
{
uint32_t local = OS_getAffinity();
uint32_t remote = SHMHDR->value;
uint32_t pool = local | remote;
if(bitcount(pool) == 1)
{
SHMHDR->value = useYield = 1;
}
else
{
SHMHDR->value = useYield = 0;
}
}
void InitCore(void)
@ -191,41 +145,42 @@ void InitCore(void)
core.modulestate = 0; // this one is dumb and has no real state
core.reserve(NUM_CORE_CMDS);
core.set_command(CORE_RUNNING, CANCELLATION, "Running", NULL);
core.set_command(CORE_RUNNING, CANCELLATION, "Running");
core.set_command(CORE_GET_VERSION, FUNCTION,"Get core version",GetCoreVersion);
core.set_command(CORE_RET_VERSION, CLIENT_WAIT,"Core version return",0);
core.set_command(CORE_GET_VERSION, FUNCTION,"Get core version",GetCoreVersion, CORE_RET_VERSION);
core.set_command(CORE_RET_VERSION, CLIENT_WAIT,"Core version return");
core.set_command(CORE_GET_PID, FUNCTION, "Get PID", GetPID);
core.set_command(CORE_RET_PID, CLIENT_WAIT, "PID return", 0);
core.set_command(CORE_GET_PID, FUNCTION, "Get PID", GetPID, CORE_RET_PID);
core.set_command(CORE_RET_PID, CLIENT_WAIT, "PID return");
core.set_command(CORE_DFPP_READ, FUNCTION,"Raw read",ReadRaw);
core.set_command(CORE_RET_DATA, CLIENT_WAIT,"Raw read return",0);
core.set_command(CORE_DFPP_READ, FUNCTION,"Raw read",ReadRaw, CORE_RET_DATA);
core.set_command(CORE_RET_DATA, CLIENT_WAIT,"Raw read return");
core.set_command(CORE_READ_DWORD, FUNCTION,"Read DWORD",ReadDWord);
core.set_command(CORE_RET_DWORD, CLIENT_WAIT,"Read DWORD return",0);
core.set_command(CORE_READ_DWORD, FUNCTION,"Read DWORD",ReadDWord, CORE_RET_DWORD);
core.set_command(CORE_RET_DWORD, CLIENT_WAIT,"Read DWORD return");
core.set_command(CORE_READ_WORD, FUNCTION,"Read WORD",ReadWord);
core.set_command(CORE_RET_WORD, CLIENT_WAIT,"Read WORD return",0);
core.set_command(CORE_READ_WORD, FUNCTION,"Read WORD",ReadWord, CORE_RET_WORD);
core.set_command(CORE_RET_WORD, CLIENT_WAIT,"Read WORD return");
core.set_command(CORE_READ_BYTE, FUNCTION,"Read BYTE",ReadByte);
core.set_command(CORE_RET_BYTE, CLIENT_WAIT,"Read BYTE return",0);
core.set_command(CORE_READ_BYTE, FUNCTION,"Read BYTE",ReadByte, CORE_RET_BYTE);
core.set_command(CORE_RET_BYTE, CLIENT_WAIT,"Read BYTE return");
core.set_command(CORE_SV_ERROR, CANCELLATION, "Server error", 0);
core.set_command(CORE_CL_ERROR, CANCELLATION, "Client error", 0);
core.set_command(CORE_SV_ERROR, CANCELLATION, "Server error");
core.set_command(CORE_CL_ERROR, CANCELLATION, "Client error");
core.set_command(CORE_WRITE, FUNCTION, "Raw write", WriteRaw);
core.set_command(CORE_WRITE_DWORD, FUNCTION, "Write DWORD", WriteDWord);
core.set_command(CORE_WRITE_WORD, FUNCTION, "Write WORD", WriteWord);
core.set_command(CORE_WRITE_BYTE, FUNCTION, "Write BYTE", WriteByte);
core.set_command(CORE_WRITE, FUNCTION, "Raw write", WriteRaw, CORE_SUSPENDED);
core.set_command(CORE_WRITE_DWORD, FUNCTION, "Write DWORD", WriteDWord, CORE_SUSPENDED);
core.set_command(CORE_WRITE_WORD, FUNCTION, "Write WORD", WriteWord, CORE_SUSPENDED);
core.set_command(CORE_WRITE_BYTE, FUNCTION, "Write BYTE", WriteByte, CORE_SUSPENDED);
core.set_command(CORE_SUSPEND, FUNCTION, "Suspend", Suspend);
core.set_command(CORE_SUSPENDED, CLIENT_WAIT, "Suspended", 0);
core.set_command(CORE_SUSPEND, CLIENT_WAIT, "Suspend", 0 , CORE_SUSPENDED);
core.set_command(CORE_SUSPENDED, CLIENT_WAIT, "Suspended");
core.set_command(CORE_READ_STL_STRING, FUNCTION, "Read STL string", ReadSTLString);
core.set_command(CORE_READ_C_STRING, CLIENT_WAIT, "RESERVED", 0);
core.set_command(CORE_RET_STRING, CLIENT_WAIT, "Return string", 0);
core.set_command(CORE_WRITE_STL_STRING, FUNCTION, "Write STL string", WriteSTLString);
core.set_command(CORE_READ_STL_STRING, FUNCTION, "Read STL string", ReadSTLString, CORE_RET_STRING);
core.set_command(CORE_READ_C_STRING, CLIENT_WAIT, "RESERVED");
core.set_command(CORE_RET_STRING, CLIENT_WAIT, "Return string");
core.set_command(CORE_WRITE_STL_STRING, FUNCTION, "Write STL string", WriteSTLString, CORE_SUSPENDED);
core.set_command(CORE_SYNC_YIELD, FUNCTION, "Synchronize affinity/yield", SyncYield, CORE_SYNC_YIELD_RET);
module_registry.push_back(core);
}
@ -233,6 +188,7 @@ void InitModules (void)
{
// create the core module
InitCore();
// TODO: dynamic module init
}
void SHM_Act (void)
@ -252,7 +208,7 @@ void SHM_Act (void)
}
else
{
full_barrier
// full_barrier
SHMCMD = CORE_RUNNING;
fprintf(stderr,"dfhack: Broke out of loop, other process disappeared.\n");
}
@ -264,9 +220,16 @@ void SHM_Act (void)
{
cmd._function();
}
if(cmd.nextState != -1)
{
SHMCMD = cmd.nextState;
}
if(cmd.type != CANCELLATION)
{
SCHED_YIELD
if(useYield)
{
SCHED_YIELD
}
numwaits ++; // watchdog timeout
goto check_again;
}

@ -37,11 +37,8 @@ distribution.
#include <vector>
#include <string>
#include "shms.h"
#include <sys/time.h>
#include <time.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <signal.h>
#include "shms-core.h"
#include <sched.h>
#define DFhackCExport extern "C" __attribute__ ((visibility("default")))
@ -69,6 +66,15 @@ uint32_t OS_getPID()
return getpid();
}
uint32_t OS_getAffinity()
{
cpu_set_t mask;
sched_getaffinity(0,sizeof(cpu_set_t),&mask);
// FIXME: truncation
uint32_t affinity = *(uint32_t *) &mask;
return affinity;
}
void SHM_Init ( void )
{
// check that we do this only once per process

@ -1,7 +1,6 @@
#ifndef DFCONNECT_H
#define DFCONNECT_H
#define CORE_VERSION 3
#define SHM_KEY 123466
#define SHM_HEADER 1024 // 1kB reserved for a header
#define SHM_BODY 1024*1024 // 1MB reserved for bulk data transfer
@ -12,16 +11,14 @@
#ifdef LINUX_BUILD
// a full memory barrier! better be safe than sorry.
#define full_barrier asm volatile("" ::: "memory"); __sync_synchronize();
#define SCHED_YIELD sched_yield(); // slow but allows the SHM to work on single-core
// #define SCHED_YIELD usleep(0); // extremely slow
// #define SCHED_YIELD // works only on multi-core
#define SCHED_YIELD sched_yield(); // a requirement for single-core
#else
// we need windows.h for Sleep()
#define _WIN32_WINNT 0x0501 // needed for INPUT struct
#define WINVER 0x0501 // OpenThread(), PSAPI, Toolhelp32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#define SCHED_YIELD Sleep(0); // slow on single-core, but avoids infinite lockup
#define SCHED_YIELD Sleep(0); // avoids infinite lockup on single core
// FIXME: detect MSVC here and use the right barrier magic
#ifdef __MINGW32__
#define full_barrier asm volatile("" ::: "memory");
@ -32,66 +29,40 @@
#endif
#endif
enum DF_SHM_ERRORSTATE
enum DFPP_CmdType
{
SHM_OK, // all OK
SHM_CANT_GET_SHM, // getting the SHM ID failed for some reason
SHM_CANT_ATTACH, // we can't attach the shm for some reason
SHM_SECOND_DF // we are a second DF process, can't use SHM at all
CANCELLATION, // we should jump out of the Act()
CLIENT_WAIT, // we are waiting for the client
FUNCTION, // we call a function as a result of the command
};
enum CORE_COMMAND
struct DFPP_command
{
CORE_RUNNING = 0, // no command, normal server execution
CORE_GET_VERSION, // protocol version query
CORE_RET_VERSION, // return the protocol version
CORE_GET_PID, // query for the process ID
CORE_RET_PID, // return process ID
// version 1 stuff below
CORE_DFPP_READ, // cl -> sv, read some data
CORE_RET_DATA, // sv -> cl, returned data
CORE_READ_DWORD, // cl -> sv, read a dword
CORE_RET_DWORD, // sv -> cl, returned dword
CORE_READ_WORD, // cl -> sv, read a word
CORE_RET_WORD, // sv -> cl, returned word
CORE_READ_BYTE, // cl -> sv, read a byte
CORE_RET_BYTE, // sv -> cl, returned byte
CORE_SV_ERROR, // there was a server error
CORE_CL_ERROR, // there was a client error
CORE_WRITE,// client writes to server
CORE_WRITE_DWORD,// client writes a DWORD to server
CORE_WRITE_WORD,// client writes a WORD to server
CORE_WRITE_BYTE,// client writes a BYTE to server
CORE_SUSPEND, // client notifies server to wait for commands (server is stalled in busy wait)
CORE_SUSPENDED, // response to WAIT, server is stalled in busy wait
// all strings capped at 1MB
CORE_READ_STL_STRING,// client requests contents of STL string at address
CORE_READ_C_STRING,// client requests contents of a C string at address, max length (0 means zero terminated)
CORE_RET_STRING, // sv -> cl length + string contents
CORE_WRITE_STL_STRING,// client wants to set STL string at address to something
// compare affinity and determine if using yield is required
CORE_SYNC_YIELD,// cl sends affinity to sv, sv sets yield
CORE_SYNC_YIELD_RET,// sv returns yield bool
NUM_CORE_CMDS
void (*_function)(void);
DFPP_CmdType type:32; // force the enum to 32 bits for compatibility reasons
std::string name;
uint32_t nextState;
};
enum DF_ERROR
struct DFPP_module
{
DFEE_INVALID_COMMAND,
DFEE_BUFFER_OVERFLOW
inline void set_command(const unsigned int index, const DFPP_CmdType type, const char * name, void (*_function)(void) = 0,uint32_t nextState = -1)
{
commands[index].type = type;
commands[index].name = name;
commands[index]._function = _function;
commands[index].nextState = nextState;
}
inline void reserve (unsigned int numcommands)
{
commands.clear();
DFPP_command cmd = {0,CANCELLATION,"",0};
commands.resize(numcommands,cmd);
}
std::string name;
uint32_t version; // version
std::vector <DFPP_command> commands;
void * modulestate;
};
typedef union
@ -108,17 +79,10 @@ typedef union
}
} shm_cmd;
typedef struct
{
shm_cmd cmd;
uint32_t address;
uint32_t value;
uint32_t length;
} shm_header;
void SHM_Act (void);
void InitModules (void);
bool isValidSHM();
uint32_t OS_getPID();
uint32_t OS_getAffinity(); // limited to 32 processors. Silly, eh?
#endif