@ -420,20 +420,20 @@ func GQLWSHandler(ctx * Context, server *Node, gql_ext *GQLExt) func(http.Respon
}
}
type GQL Interface struct {
type Interface struct {
Interface * graphql . Interface
Default * graphql . Object
List * graphql . List
Extensions [ ] ExtType
}
type GQL Type struct {
type Type struct {
Type * graphql . Object
List * graphql . List
}
func NewGQLNodeType ( node_type NodeType , interfaces [ ] * graphql . Interface , init func ( * GQL Type) ) * GQL Type {
var gql GQL Type
func NewGQLNodeType ( node_type NodeType , interfaces [ ] * graphql . Interface , init func ( * Type) ) * Type {
var gql Type
gql . Type = graphql . NewObject ( graphql . ObjectConfig {
Name : string ( node_type ) ,
Interfaces : interfaces ,
@ -452,8 +452,8 @@ func NewGQLNodeType(node_type NodeType, interfaces []*graphql.Interface, init fu
return & gql
}
func New GQL Interface( if_name string , default_name string , interfaces [ ] * graphql . Interface , extensions [ ] ExtType , init_1 func ( * GQL Interface) , init_2 func ( * GQL Interface) ) * GQL Interface {
var gql GQL Interface
func New Interface( if_name string , default_name string , interfaces [ ] * graphql . Interface , extensions [ ] ExtType , init_1 func ( * Interface) , init_2 func ( * Interface) ) * Interface {
var gql Interface
gql . Extensions = extensions
gql . Interface = graphql . NewInterface ( graphql . InterfaceConfig {
Name : if_name ,
@ -488,7 +488,7 @@ type GQLExtContext struct {
// Custom graphql types, mapped to NodeTypes
NodeTypes map [ NodeType ] * graphql . Object
Interfaces [ ] * GQL Interface
Interfaces [ ] * Interface
// Schema parameters
Types [ ] graphql . Type
@ -508,7 +508,7 @@ func BuildSchema(ctx *GQLExtContext) (graphql.Schema, error) {
return graphql . NewSchema ( schemaConfig )
}
func ( ctx * GQLExtContext ) AddInterface ( i * GQL Interface) error {
func ( ctx * GQLExtContext ) AddInterface ( i * Interface) error {
if i == nil {
return fmt . Errorf ( "interface is nil" )
}
@ -544,24 +544,23 @@ func NewGQLExtContext() *GQLExtContext {
Fields : graphql . Fields { } ,
} )
query . AddFieldConfig ( "Self" , GQL QuerySelf)
query . AddFieldConfig ( "Node" , GQL QueryNode)
query . AddFieldConfig ( "Self" , QuerySelf)
query . AddFieldConfig ( "Node" , QueryNode)
mutation := graphql . NewObject ( graphql . ObjectConfig {
Name : "Mutation" ,
Fields : graphql . Fields { } ,
} )
mutation . AddFieldConfig ( "stop" , GQLMutationStop )
mutation . AddFieldConfig ( "startChild" , GQLMutationStartChild )
mutation . AddFieldConfig ( "stop" , MutationStop )
subscription := graphql . NewObject ( graphql . ObjectConfig {
Name : "Subscription" ,
Fields : graphql . Fields { } ,
} )
subscription . AddFieldConfig ( "Self" , GQL SubscriptionSelf)
subscription . AddFieldConfig ( " Update", GQLSubscriptionUpdat e)
subscription . AddFieldConfig ( "Self" , SubscriptionSelf)
subscription . AddFieldConfig ( " Self", SubscriptionNod e)
context := GQLExtContext {
Schema : graphql . Schema { } ,
@ -570,15 +569,15 @@ func NewGQLExtContext() *GQLExtContext {
Mutation : mutation ,
Subscription : subscription ,
NodeTypes : map [ NodeType ] * graphql . Object { } ,
Interfaces : [ ] * GQL Interface{ } ,
Interfaces : [ ] * Interface{ } ,
}
var err error
err = context . AddInterface ( GQL InterfaceNode)
err = context . AddInterface ( InterfaceNode)
if err != nil {
panic ( err )
}
err = context . AddInterface ( GQL InterfaceLockable)
err = context . AddInterface ( InterfaceLockable)
if err != nil {
panic ( err )
}
@ -597,12 +596,10 @@ type GQLExt struct {
tcp_listener net . Listener
http_server * http . Server
http_done sync . WaitGroup
tls_key [ ] byte
tls_cert [ ] byte
Listen string
SubscribeLock sync . Mutex
SubscribeListeners [ ] chan Signal
}
func ( ext * GQLExt ) Field ( name string ) interface { } {
@ -613,37 +610,24 @@ func (ext *GQLExt) Field(name string) interface{} {
} )
}
func ( ext * GQLExt ) NewSubscriptionChannel ( buffer int ) chan Signal {
ext . SubscribeLock . Lock ( )
defer ext . SubscribeLock . Unlock ( )
new_listener := make ( chan Signal , buffer )
ext . SubscribeListeners = append ( ext . SubscribeListeners , new_listener )
return new_listener
}
func ( ext * GQLExt ) Process ( context * Context , princ_id NodeID , node * Node , signal Signal ) {
if signal . Type ( ) == ReadResultSignalType {
}
ext . SubscribeLock . Lock ( )
defer ext . SubscribeLock . Unlock ( )
active_listeners := [ ] chan Signal { }
for _ , listener := range ( ext . SubscribeListeners ) {
select {
case listener <- signal :
active_listeners = append ( active_listeners , listener )
default :
go func ( listener chan Signal ) {
listener <- NewDirectSignal ( "Channel Closed" )
close ( listener )
} ( listener )
func ( ext * GQLExt ) Process ( ctx * Context , source NodeID , node * Node , signal Signal ) {
if signal . Type ( ) == GQLStateSignalType {
sig := signal . ( StateSignal )
switch sig . State {
case "start_server" :
err := ext . StartGQLServer ( ctx , node )
if err == nil {
ctx . Send ( node . ID , source , StateSignal { NewDirectSignal ( GQLStateSignalType ) , "server_started" } )
}
case "stop_server" :
err := ext . StopGQLServer ( )
if err == nil {
ctx . Send ( node . ID , source , StateSignal { NewDirectSignal ( GQLStateSignalType ) , "server_stopped" } )
}
default :
ctx . Log . Logf ( "gql" , "unknown gql state %s" , sig . State )
}
}
ext . SubscribeListeners = active_listeners
return
}
func ( ext * GQLExt ) Type ( ) ExtType {
@ -732,16 +716,18 @@ func NewGQLExt(ctx *Context, listen string, tls_cert []byte, tls_key []byte) (*G
}
return & GQLExt {
Listen : listen ,
SubscribeListeners : [ ] chan Signal { } ,
tls_cert : tls_cert ,
tls_key : tls_key ,
} , nil
}
func StartGQLServer ( ctx * Context , node * Node , gql_ext * GQLExt ) error {
func ( ext * GQLExt ) StartGQLServer ( ctx * Context , node * Node ) error {
if ext . tcp_listener != nil || ext . http_server != nil {
return fmt . Errorf ( "listener or server is still running, stop them first" )
}
mux := http . NewServeMux ( )
mux . HandleFunc ( "/gql" , GQLHandler ( ctx , node , gql_ext ) )
mux . HandleFunc ( "/gqlws" , GQLWSHandler ( ctx , node , gql_ext ) )
mux . HandleFunc ( "/gql" , GQLHandler ( ctx , node , ext) )
mux . HandleFunc ( "/gqlws" , GQLWSHandler ( ctx , node , ext) )
// Server a graphiql interface(TODO make configurable whether to start this)
mux . HandleFunc ( "/graphiql" , GraphiQLHandler ( ) )
@ -751,7 +737,7 @@ func StartGQLServer(ctx *Context, node *Node, gql_ext *GQLExt) error {
mux . Handle ( "/site/" , http . StripPrefix ( "/site" , fs ) )
http_server := & http . Server {
Addr : gql_ ext. Listen ,
Addr : ext. Listen ,
Handler : mux ,
}
@ -760,7 +746,7 @@ func StartGQLServer(ctx *Context, node *Node, gql_ext *GQLExt) error {
return fmt . Errorf ( "Failed to start listener for server on %s" , http_server . Addr )
}
cert , err := tls . X509KeyPair ( gql_ ext. tls_cert , gql_ ext. tls_key )
cert , err := tls . X509KeyPair ( ext. tls_cert , ext. tls_key )
if err != nil {
return err
}
@ -772,23 +758,29 @@ func StartGQLServer(ctx *Context, node *Node, gql_ext *GQLExt) error {
listener := tls . NewListener ( l , & config )
gql_ ext. http_done . Add ( 1 )
ext. http_done . Add ( 1 )
go func ( qql_ext * GQLExt ) {
defer gql_ ext. http_done . Done ( )
defer ext. http_done . Done ( )
err := http_server . Serve ( listener )
if err != http . ErrServerClosed {
panic ( fmt . Sprintf ( "Failed to start gql server: %s" , err ) )
}
} ( gql_ ext)
} ( ext)
gql_ ext. tcp_listener = listener
gql_ ext. http_server = http_server
ext. tcp_listener = listener
ext. http_server = http_server
return nil
}
func StopGQLServer ( gql_ext * GQLExt ) {
gql_ext . http_server . Shutdown ( context . TODO ( ) )
gql_ext . http_done . Wait ( )
func ( ext * GQLExt ) StopGQLServer ( ) error {
if ext . tcp_listener == nil || ext . http_server == nil {
return fmt . Errorf ( "already shutdown, cannot shut down again" )
}
ext . http_server . Shutdown ( context . TODO ( ) )
ext . http_done . Wait ( )
ext . tcp_listener = nil
ext . http_server = nil
return nil
}