@ -198,6 +198,14 @@ void ServerConnection::connection_ostream::flush_proxy()
void ServerConnection : : threadFn ( void * arg )
void ServerConnection : : threadFn ( void * arg )
{
{
ServerConnection * me = ( ServerConnection * ) arg ;
ServerConnection * me = ( ServerConnection * ) arg ;
me - > threadFn ( ) ;
delete me ;
}
void ServerConnection : : threadFn ( )
{
color_ostream_proxy out ( Core : : getInstance ( ) . getConsole ( ) ) ;
color_ostream_proxy out ( Core : : getInstance ( ) . getConsole ( ) ) ;
/* Handshake */
/* Handshake */
@ -205,10 +213,9 @@ void ServerConnection::threadFn(void *arg)
{
{
RPCHandshakeHeader header ;
RPCHandshakeHeader header ;
if ( ! readFullBuffer ( me- > socket, & header , sizeof ( header ) ) )
if ( ! readFullBuffer ( socket, & header , sizeof ( header ) ) )
{
{
out < < " In RPC server: could not read handshake header. " < < endl ;
out < < " In RPC server: could not read handshake header. " < < endl ;
delete me ;
return ;
return ;
}
}
@ -216,17 +223,15 @@ void ServerConnection::threadFn(void *arg)
header . version ! = 1 )
header . version ! = 1 )
{
{
out < < " In RPC server: invalid handshake header. " < < endl ;
out < < " In RPC server: invalid handshake header. " < < endl ;
delete me ;
return ;
return ;
}
}
memcpy ( header . magic , RPCHandshakeHeader : : RESPONSE_MAGIC , sizeof ( header . magic ) ) ;
memcpy ( header . magic , RPCHandshakeHeader : : RESPONSE_MAGIC , sizeof ( header . magic ) ) ;
header . version = 1 ;
header . version = 1 ;
if ( me- > socket- > Send ( ( uint8 * ) & header , sizeof ( header ) ) ! = sizeof ( header ) )
if ( socket- > Send ( ( uint8 * ) & header , sizeof ( header ) ) ! = sizeof ( header ) )
{
{
out < < " In RPC server: could not send handshake response. " < < endl ;
out < < " In RPC server: could not send handshake response. " < < endl ;
delete me ;
return ;
return ;
}
}
}
}
@ -235,11 +240,11 @@ void ServerConnection::threadFn(void *arg)
std : : cerr < < " Client connection established. " < < endl ;
std : : cerr < < " Client connection established. " < < endl ;
while ( ! me- > in_error) {
while ( ! in_error) {
// Read the message
// Read the message
RPCMessageHeader header ;
RPCMessageHeader header ;
if ( ! readFullBuffer ( me- > socket, & header , sizeof ( header ) ) )
if ( ! readFullBuffer ( socket, & header , sizeof ( header ) ) )
{
{
out . printerr ( " In RPC server: I/O error in receive header. \n " ) ;
out . printerr ( " In RPC server: I/O error in receive header. \n " ) ;
break ;
break ;
@ -256,7 +261,7 @@ void ServerConnection::threadFn(void *arg)
std : : auto_ptr < uint8_t > buf ( new uint8_t [ header . size ] ) ;
std : : auto_ptr < uint8_t > buf ( new uint8_t [ header . size ] ) ;
if ( ! readFullBuffer ( me- > socket, buf . get ( ) , header . size ) )
if ( ! readFullBuffer ( socket, buf . get ( ) , header . size ) )
{
{
out . printerr ( " In RPC server: I/O error in receive %d bytes of data. \n " , header . size ) ;
out . printerr ( " In RPC server: I/O error in receive %d bytes of data. \n " , header . size ) ;
break ;
break ;
@ -267,31 +272,40 @@ void ServerConnection::threadFn(void *arg)
// Find and call the function
// Find and call the function
int in_size = header . size ;
int in_size = header . size ;
ServerFunctionBase * fn = vector_get ( me- > functions, header . id ) ;
ServerFunctionBase * fn = vector_get ( functions, header . id ) ;
MessageLite * reply = NULL ;
MessageLite * reply = NULL ;
command_result res = CR_FAILURE ;
command_result res = CR_FAILURE ;
if ( ! fn )
if ( ! fn )
{
{
me- > stream. printerr ( " RPC call of invalid id %d \n " , header . id ) ;
stream. printerr ( " RPC call of invalid id %d \n " , header . id ) ;
}
}
else
else
{
{
if ( ! fn - > in ( ) - > ParseFromArray ( buf . get ( ) , header . size ) )
if ( ! fn - > in ( ) - > ParseFromArray ( buf . get ( ) , header . size ) )
{
{
me- > stream. printerr ( " In call to %s: could not decode input args. \n " , fn - > name ) ;
stream. printerr ( " In call to %s: could not decode input args. \n " , fn - > name ) ;
}
}
else
else
{
{
buf . reset ( ) ;
buf . reset ( ) ;
reply = fn - > out ( ) ;
reply = fn - > out ( ) ;
res = fn - > execute ( me - > stream ) ;
if ( fn - > flags & SF_DONT_SUSPEND )
{
res = fn - > execute ( stream ) ;
}
else
{
CoreSuspender suspend ;
res = fn - > execute ( stream ) ;
}
}
}
}
}
// Flush all text output
// Flush all text output
if ( me - > in_error )
if ( in_error)
break ;
break ;
//out.print("Answer %d:%d\n", res, reply);
//out.print("Answer %d:%d\n", res, reply);
@ -301,16 +315,16 @@ void ServerConnection::threadFn(void *arg)
if ( out_size > RPCMessageHeader : : MAX_MESSAGE_SIZE )
if ( out_size > RPCMessageHeader : : MAX_MESSAGE_SIZE )
{
{
me- > stream. printerr ( " In call to %s: reply too large: %d. \n " ,
stream. printerr ( " In call to %s: reply too large: %d. \n " ,
( fn ? fn - > name : " UNKNOWN " ) , out_size ) ;
( fn ? fn - > name : " UNKNOWN " ) , out_size ) ;
res = CR_LINK_FAILURE ;
res = CR_LINK_FAILURE ;
}
}
me- > stream. flush ( ) ;
stream. flush ( ) ;
if ( res = = CR_OK & & reply )
if ( res = = CR_OK & & reply )
{
{
if ( ! sendRemoteMessage ( me- > socket, RPC_REPLY_RESULT , reply , true ) )
if ( ! sendRemoteMessage ( socket, RPC_REPLY_RESULT , reply , true ) )
{
{
out . printerr ( " In RPC server: I/O error in send result. \n " ) ;
out . printerr ( " In RPC server: I/O error in send result. \n " ) ;
break ;
break ;
@ -321,7 +335,7 @@ void ServerConnection::threadFn(void *arg)
header . id = RPC_REPLY_FAIL ;
header . id = RPC_REPLY_FAIL ;
header . size = res ;
header . size = res ;
if ( me- > socket- > Send ( ( uint8_t * ) & header , sizeof ( header ) ) ! = sizeof ( header ) )
if ( socket- > Send ( ( uint8_t * ) & header , sizeof ( header ) ) ! = sizeof ( header ) )
{
{
out . printerr ( " In RPC server: I/O error in send failure code. \n " ) ;
out . printerr ( " In RPC server: I/O error in send failure code. \n " ) ;
break ;
break ;
@ -337,8 +351,6 @@ void ServerConnection::threadFn(void *arg)
}
}
std : : cerr < < " Shutting down client connection. " < < endl ;
std : : cerr < < " Shutting down client connection. " < < endl ;
delete me ;
}
}
ServerMain : : ServerMain ( )
ServerMain : : ServerMain ( )