Finished rework of context

graph-rework-2
noah metz 2023-07-09 20:30:19 -06:00
parent 7aedf553ee
commit 6a0a0762ad
6 changed files with 167 additions and 164 deletions

@ -195,10 +195,10 @@ func NewContext(db * badger.DB, log Logger, extra_nodes map[string]NodeLoadFunc,
if err != nil { if err != nil {
panic(err) panic(err)
} }
/*err := ctx.RegisterNodeType("gql_thread", LoadGQLThread) err = ctx.RegisterNodeType("gql_thread", LoadGQLThread)
if err != nil { if err != nil {
panic(err) panic(err)
}*/ }
for name, load_fn := range(extra_nodes) { for name, load_fn := range(extra_nodes) {
err := ctx.RegisterNodeType(name, load_fn) err := ctx.RegisterNodeType(name, load_fn)

@ -344,17 +344,20 @@ func (thread * GQLThread) Serialize() ([]byte, error) {
return json.MarshalIndent(&thread_json, "", " ") return json.MarshalIndent(&thread_json, "", " ")
} }
func (thread * GQLThread) DeserializeInfo(ctx *Context, data []byte) (ThreadInfo, error) {
var info GQLThreadInfo
err := json.Unmarshal(data, &info)
if err != nil {
return nil, err
}
return &info, nil
}
type GQLThreadJSON struct { type GQLThreadJSON struct {
SimpleThreadJSON SimpleThreadJSON
Listen string `json:"listen"` Listen string `json:"listen"`
} }
type GQLThreadInfo struct {
Start bool `json:"start"`
StartAction string `json:"start_action"`
RestoreAction string `json:"restore_action"`
}
func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON { func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON {
thread_json := NewSimpleThreadJSON(&thread.SimpleThread) thread_json := NewSimpleThreadJSON(&thread.SimpleThread)
@ -364,6 +367,20 @@ func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON {
} }
} }
type GQLThreadInfo struct {
Start bool `json:"start"`
StartAction string `json:"start_action"`
RestoreAction string `json:"restore_action"`
}
func NewGQLThreadInfo(start bool, start_action string, restore_action string) GQLThreadInfo {
return GQLThreadInfo{
Start: start,
StartAction: start_action,
RestoreAction: restore_action,
}
}
func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) { func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) {
var j GQLThreadJSON var j GQLThreadJSON
err := json.Unmarshal(data, &j) err := json.Unmarshal(data, &j)
@ -386,6 +403,7 @@ func NewGQLThread(id NodeID, name string, state_name string, listen string) GQLT
return GQLThread{ return GQLThread{
SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*GQLThreadInfo)(nil)), gql_actions, gql_handlers), SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*GQLThreadInfo)(nil)), gql_actions, gql_handlers),
Listen: listen, Listen: listen,
http_done: &sync.WaitGroup{},
} }
} }
@ -479,14 +497,14 @@ var gql_handlers ThreadHandlers = ThreadHandlers{
server := thread.(*GQLThread) server := thread.(*GQLThread)
server.http_server.Shutdown(context.TODO()) server.http_server.Shutdown(context.TODO())
server.http_done.Wait() server.http_done.Wait()
return "", NewThreadAbortedError(signal.Source()) return ThreadAbort(ctx, thread, signal)
}, },
"cancel": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) { "cancel": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_CANCEL") ctx.Log.Logf("gql", "GQL_CANCEL")
server := thread.(*GQLThread) server := thread.(*GQLThread)
server.http_server.Shutdown(context.TODO()) server.http_server.Shutdown(context.TODO())
server.http_done.Wait() server.http_done.Wait()
return "", nil return ThreadCancel(ctx, thread, signal)
}, },
} }

@ -7,12 +7,12 @@ import (
) )
var gql_interface_graph_node *graphql.Interface = nil var gql_interface_graph_node *graphql.Interface = nil
func GQLInterfaceGraphNode() *graphql.Interface { func GQLInterfaceNode() *graphql.Interface {
if gql_interface_graph_node == nil { if gql_interface_graph_node == nil {
gql_interface_graph_node = graphql.NewInterface(graphql.InterfaceConfig{ gql_interface_graph_node = graphql.NewInterface(graphql.InterfaceConfig{
Name: "GraphNode", Name: "Node",
ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object { ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil return nil
} }
@ -39,10 +39,6 @@ func GQLInterfaceGraphNode() *graphql.Interface {
gql_interface_graph_node.AddFieldConfig("ID", &graphql.Field{ gql_interface_graph_node.AddFieldConfig("ID", &graphql.Field{
Type: graphql.String, Type: graphql.String,
}) })
gql_interface_graph_node.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String,
})
} }
return gql_interface_graph_node return gql_interface_graph_node
@ -62,7 +58,7 @@ func GQLInterfaceThread() *graphql.Interface {
gql_interface_thread = graphql.NewInterface(graphql.InterfaceConfig{ gql_interface_thread = graphql.NewInterface(graphql.InterfaceConfig{
Name: "Thread", Name: "Thread",
ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object { ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil return nil
} }
@ -133,7 +129,7 @@ func GQLInterfaceLockable() *graphql.Interface {
gql_interface_lockable = graphql.NewInterface(graphql.InterfaceConfig{ gql_interface_lockable = graphql.NewInterface(graphql.InterfaceConfig{
Name: "Lockable", Name: "Lockable",
ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object { ResolveType: func(p graphql.ResolveTypeParams) *graphql.Object {
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil return nil
} }
@ -186,27 +182,27 @@ func GQLInterfaceLockable() *graphql.Interface {
} }
func GQLNodeID(p graphql.ResolveParams) (interface{}, error) { func GQLNodeID(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(GraphNode) node, ok := p.Source.(Node)
if ok == false || node == nil { if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to GraphNode") return nil, fmt.Errorf("Failed to cast source to Node")
} }
return node.ID(), nil return node.ID(), nil
} }
func GQLNodeName(p graphql.ResolveParams) (interface{}, error) { func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(GraphNode) node, ok := p.Source.(*GQLThread)
if ok == false || node == nil { if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to GraphNode") return nil, fmt.Errorf("Failed to cast source to GQLThread")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
name := "" listen := ""
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) (error) {
name = states[node.ID()].Name() listen = node.Listen
return nil return nil
}) })
@ -214,27 +210,23 @@ func GQLNodeName(p graphql.ResolveParams) (interface{}, error) {
return nil, err return nil, err
} }
return name, nil return listen, nil
} }
func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) { func GQLThreadParent(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(*GQLThread) node, ok := p.Source.(Thread)
if ok == false || node == nil { if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to GQLThread") return nil, fmt.Errorf("Failed to cast source to Thread")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
listen := "" var parent Thread = nil
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) (error) {
gql_thread, ok := states[node.ID()].(*GQLThreadState) parent = node.Parent()
if ok == false {
return fmt.Errorf("Failed to cast state to GQLThreadState")
}
listen = gql_thread.Listen
return nil return nil
}) })
@ -242,27 +234,23 @@ func GQLThreadListen(p graphql.ResolveParams) (interface{}, error) {
return nil, err return nil, err
} }
return listen, nil return parent, nil
} }
func GQLThreadParent(p graphql.ResolveParams) (interface{}, error) { func GQLThreadChildren(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(Thread) node, ok := p.Source.(Thread)
if ok == false || node == nil { if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to Thread") return nil, fmt.Errorf("Failed to cast source to Thread")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
var parent Thread = nil var children []Thread = nil
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) (error) {
gql_thread, ok := states[node.ID()].(ThreadState) children = node.Children()
if ok == false {
return fmt.Errorf("Failed to cast state to ThreadState")
}
parent = gql_thread.Parent()
return nil return nil
}) })
@ -270,27 +258,23 @@ func GQLThreadParent(p graphql.ResolveParams) (interface{}, error) {
return nil, err return nil, err
} }
return parent, nil return children, nil
} }
func GQLThreadChildren(p graphql.ResolveParams) (interface{}, error) { func GQLLockableName(p graphql.ResolveParams) (interface{}, error) {
node, ok := p.Source.(Thread) node, ok := p.Source.(Lockable)
if ok == false || node == nil { if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast source to Thread") return nil, fmt.Errorf("Failed to cast source to Lockable")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false || node == nil {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
var children []Thread = nil name := ""
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) error {
gql_thread, ok := states[node.ID()].(ThreadState) name = node.Name()
if ok == false {
return fmt.Errorf("Failed to cast state to ThreadState")
}
children = gql_thread.Children()
return nil return nil
}) })
@ -298,7 +282,7 @@ func GQLThreadChildren(p graphql.ResolveParams) (interface{}, error) {
return nil, err return nil, err
} }
return children, nil return name, nil
} }
func GQLLockableRequirements(p graphql.ResolveParams) (interface{}, error) { func GQLLockableRequirements(p graphql.ResolveParams) (interface{}, error) {
@ -307,18 +291,14 @@ func GQLLockableRequirements(p graphql.ResolveParams) (interface{}, error) {
return nil, fmt.Errorf("Failed to cast source to Lockable") return nil, fmt.Errorf("Failed to cast source to Lockable")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
var requirements []Lockable = nil var requirements []Lockable = nil
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) (error) {
gql_thread, ok := states[node.ID()].(LockableState) requirements = node.Requirements()
if ok == false {
return fmt.Errorf("Failed to cast state to LockableState")
}
requirements = gql_thread.Requirements()
return nil return nil
}) })
@ -335,18 +315,14 @@ func GQLLockableDependencies(p graphql.ResolveParams) (interface{}, error) {
return nil, fmt.Errorf("Failed to cast source to Lockable") return nil, fmt.Errorf("Failed to cast source to Lockable")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
var dependencies []Lockable = nil var dependencies []Lockable = nil
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) (error) {
gql_thread, ok := states[node.ID()].(LockableState) dependencies = node.Dependencies()
if ok == false {
return fmt.Errorf("Failed to cast state to LockableState")
}
dependencies = gql_thread.Dependencies()
return nil return nil
}) })
@ -363,18 +339,14 @@ func GQLLockableOwner(p graphql.ResolveParams) (interface{}, error) {
return nil, fmt.Errorf("Failed to cast source to Lockable") return nil, fmt.Errorf("Failed to cast source to Lockable")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext") return nil, fmt.Errorf("Failed to cast context graph_context to Context")
} }
var owner GraphNode = nil var owner Node = nil
err := UseStates(ctx, []GraphNode{node}, func(states NodeStateMap) (error) { err := UseStates(ctx, []Node{node}, func(nodes NodeMap) (error) {
gql_thread, ok := states[node.ID()].(LockableState) owner = node.Owner()
if ok == false {
return fmt.Errorf("Failed to cast state to LockableState")
}
owner = gql_thread.Owner()
return nil return nil
}) })
@ -392,7 +364,7 @@ func GQLTypeGQLThread() * graphql.Object {
gql_type_gql_thread = graphql.NewObject(graphql.ObjectConfig{ gql_type_gql_thread = graphql.NewObject(graphql.ObjectConfig{
Name: "GQLThread", Name: "GQLThread",
Interfaces: []*graphql.Interface{ Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(), GQLInterfaceNode(),
GQLInterfaceThread(), GQLInterfaceThread(),
GQLInterfaceLockable(), GQLInterfaceLockable(),
}, },
@ -410,7 +382,7 @@ func GQLTypeGQLThread() * graphql.Object {
gql_type_gql_thread.AddFieldConfig("Name", &graphql.Field{ gql_type_gql_thread.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String, Type: graphql.String,
Resolve: GQLNodeName, Resolve: GQLLockableName,
}) })
gql_type_gql_thread.AddFieldConfig("Children", &graphql.Field{ gql_type_gql_thread.AddFieldConfig("Children", &graphql.Field{
@ -452,12 +424,12 @@ func GQLTypeBaseThread() * graphql.Object {
gql_type_base_thread = graphql.NewObject(graphql.ObjectConfig{ gql_type_base_thread = graphql.NewObject(graphql.ObjectConfig{
Name: "BaseThread", Name: "BaseThread",
Interfaces: []*graphql.Interface{ Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(), GQLInterfaceNode(),
GQLInterfaceThread(), GQLInterfaceThread(),
GQLInterfaceLockable(), GQLInterfaceLockable(),
}, },
IsTypeOf: func(p graphql.IsTypeOfParams) bool { IsTypeOf: func(p graphql.IsTypeOfParams) bool {
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return false return false
} }
@ -481,7 +453,7 @@ func GQLTypeBaseThread() * graphql.Object {
gql_type_base_thread.AddFieldConfig("Name", &graphql.Field{ gql_type_base_thread.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String, Type: graphql.String,
Resolve: GQLNodeName, Resolve: GQLLockableName,
}) })
gql_type_base_thread.AddFieldConfig("Children", &graphql.Field{ gql_type_base_thread.AddFieldConfig("Children", &graphql.Field{
@ -518,11 +490,11 @@ func GQLTypeBaseLockable() * graphql.Object {
gql_type_base_lockable = graphql.NewObject(graphql.ObjectConfig{ gql_type_base_lockable = graphql.NewObject(graphql.ObjectConfig{
Name: "BaseLockable", Name: "BaseLockable",
Interfaces: []*graphql.Interface{ Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(), GQLInterfaceNode(),
GQLInterfaceLockable(), GQLInterfaceLockable(),
}, },
IsTypeOf: func(p graphql.IsTypeOfParams) bool { IsTypeOf: func(p graphql.IsTypeOfParams) bool {
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return false return false
} }
@ -546,7 +518,7 @@ func GQLTypeBaseLockable() * graphql.Object {
gql_type_base_lockable.AddFieldConfig("Name", &graphql.Field{ gql_type_base_lockable.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String, Type: graphql.String,
Resolve: GQLNodeName, Resolve: GQLLockableName,
}) })
gql_type_base_lockable.AddFieldConfig("Requirements", &graphql.Field{ gql_type_base_lockable.AddFieldConfig("Requirements", &graphql.Field{
@ -573,10 +545,10 @@ func GQLTypeBaseNode() * graphql.Object {
gql_type_base_node = graphql.NewObject(graphql.ObjectConfig{ gql_type_base_node = graphql.NewObject(graphql.ObjectConfig{
Name: "BaseNode", Name: "BaseNode",
Interfaces: []*graphql.Interface{ Interfaces: []*graphql.Interface{
GQLInterfaceGraphNode(), GQLInterfaceNode(),
}, },
IsTypeOf: func(p graphql.IsTypeOfParams) bool { IsTypeOf: func(p graphql.IsTypeOfParams) bool {
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return false return false
} }
@ -600,7 +572,7 @@ func GQLTypeBaseNode() * graphql.Object {
gql_type_base_node.AddFieldConfig("Name", &graphql.Field{ gql_type_base_node.AddFieldConfig("Name", &graphql.Field{
Type: graphql.String, Type: graphql.String,
Resolve: GQLNodeName, Resolve: GQLLockableName,
}) })
} }
@ -699,32 +671,32 @@ func GQLTypeSignalInput() *graphql.InputObject {
} }
func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) { func GQLSubscribeSignal(p graphql.ResolveParams) (interface{}, error) {
return GQLSubscribeFn(p, false, func(ctx *GraphContext, server *GQLThread, signal GraphSignal, p graphql.ResolveParams)(interface{}, error) { return GQLSubscribeFn(p, false, func(ctx *Context, server *GQLThread, signal GraphSignal, p graphql.ResolveParams)(interface{}, error) {
return signal, nil return signal, nil
}) })
} }
func GQLSubscribeSelf(p graphql.ResolveParams) (interface{}, error) { func GQLSubscribeSelf(p graphql.ResolveParams) (interface{}, error) {
return GQLSubscribeFn(p, true, func(ctx *GraphContext, server *GQLThread, signal GraphSignal, p graphql.ResolveParams)(interface{}, error) { return GQLSubscribeFn(p, true, func(ctx *Context, server *GQLThread, signal GraphSignal, p graphql.ResolveParams)(interface{}, error) {
return server, nil return server, nil
}) })
} }
func GQLSubscribeFn(p graphql.ResolveParams, send_nil bool, fn func(*GraphContext, *GQLThread, GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) { func GQLSubscribeFn(p graphql.ResolveParams, send_nil bool, fn func(*Context, *GQLThread, GraphSignal, graphql.ResolveParams)(interface{}, error))(interface{}, error) {
server, ok := p.Context.Value("gql_server").(*GQLThread) server, ok := p.Context.Value("gql_server").(*GQLThread)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to get gql_server from context and cast to GQLServer") return nil, fmt.Errorf("Failed to get gql_server from context and cast to GQLServer")
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to get graph_context from context and cast to GraphContext") return nil, fmt.Errorf("Failed to get graph_context from context and cast to Context")
} }
c := make(chan interface{}) c := make(chan interface{})
go func(c chan interface{}, server *GQLThread) { go func(c chan interface{}, server *GQLThread) {
ctx.Log.Logf("gqlws", "GQL_SUBSCRIBE_THREAD_START") ctx.Log.Logf("gqlws", "GQL_SUBSCRIBE_THREAD_START")
sig_c := server.UpdateChannel(1) sig_c := UpdateChannel(server, 1, RandID())
if send_nil == true { if send_nil == true {
sig_c <- nil sig_c <- nil
} }
@ -793,9 +765,9 @@ func GQLMutationSendUpdate() *graphql.Field {
return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server")) return nil, fmt.Errorf("Failed to cast context gql_server to GQLServer: %+v", p.Context.Value("gql_server"))
} }
ctx, ok := p.Context.Value("graph_context").(*GraphContext) ctx, ok := p.Context.Value("graph_context").(*Context)
if ok == false { if ok == false {
return nil, fmt.Errorf("Failed to cast context graph_context to GraphContext: %+v", p.Context.Value("graph_context")) return nil, fmt.Errorf("Failed to cast context graph_context to Context: %+v", p.Context.Value("graph_context"))
} }
signal_map, ok := p.Args["signal"].(map[string]interface{}) signal_map, ok := p.Args["signal"].(map[string]interface{})
@ -818,13 +790,13 @@ func GQLMutationSendUpdate() *graphql.Field {
return nil, fmt.Errorf("Failed to cast arg id to string") return nil, fmt.Errorf("Failed to cast arg id to string")
} }
var node GraphNode = nil var node Node = nil
err := UseStates(ctx, []GraphNode{server}, func(states NodeStateMap) (error){ err := UseStates(ctx, []Node{server}, func(nodes NodeMap) (error){
node = FindChild(ctx, server, NodeID(id), states) node = FindChild(ctx, server, NodeID(id), nodes)
if node == nil { if node == nil {
return fmt.Errorf("Failed to find ID: %s as child of server thread", id) return fmt.Errorf("Failed to find ID: %s as child of server thread", id)
} }
SendUpdate(ctx, node, signal, states) node.Signal(ctx, signal, nodes)
return nil return nil
}) })
if err != nil { if err != nil {

@ -4,64 +4,74 @@ import (
"testing" "testing"
"time" "time"
"fmt" "fmt"
"encoding/json"
"errors" "errors"
) )
func TestGQLThread(t * testing.T) { func TestGQLThread(t * testing.T) {
ctx := logTestContext(t, []string{}) ctx := logTestContext(t, []string{})
gql_thread, err := NewGQLThread(ctx, ":0", []Lockable{}) gql_t_r := NewGQLThread(RandID(), "GQL Thread", "init", ":0")
fatalErr(t, err) gql_t := &gql_t_r
test_thread_1, err := NewSimpleThread(ctx, "Test thread 1", []Lockable{}, BaseThreadActions, BaseThreadHandlers) t1_r := NewSimpleThread(RandID(), "Test thread 1", "init", nil, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err) t1 := &t1_r
t2_r := NewSimpleThread(RandID(), "Test thread 2", "init", nil, BaseThreadActions, BaseThreadHandlers)
test_thread_2, err := NewSimpleThread(ctx, "Test thread 2", []Lockable{}, BaseThreadActions, BaseThreadHandlers) t2 := &t2_r
fatalErr(t, err)
err := UpdateStates(ctx, []Node{gql_t, t1, t2}, func(nodes NodeMap) error {
i1 := NewGQLThreadInfo(true, "start", "restore") i1 := NewGQLThreadInfo(true, "start", "restore")
err = LinkThreads(ctx, gql_thread, test_thread_1, &i1) err := LinkThreads(ctx, gql_t, t1, &i1, nodes)
fatalErr(t, err) if err != nil {
return err
}
i2 := NewGQLThreadInfo(false, "start", "restore") i2 := NewGQLThreadInfo(false, "start", "restore")
err = LinkThreads(ctx, gql_thread, test_thread_2, &i2) return LinkThreads(ctx, gql_t, t2, &i2, nodes)
})
fatalErr(t, err) fatalErr(t, err)
go func(thread Thread){ go func(thread Thread){
time.Sleep(10*time.Millisecond) time.Sleep(10*time.Millisecond)
err := UseStates(ctx, []GraphNode{thread}, func(states NodeStateMap) error { err := UseStates(ctx, []Node{thread}, func(nodes NodeMap) error {
SendUpdate(ctx, thread, CancelSignal(nil), states) return thread.Signal(ctx, CancelSignal(nil), nodes)
return nil
}) })
fatalErr(t, err) fatalErr(t, err)
}(gql_thread) }(gql_t)
err = ThreadLoop(ctx, gql_thread, "start") err = ThreadLoop(ctx, gql_t, "start")
fatalErr(t, err) fatalErr(t, err)
} }
func TestGQLDBLoad(t * testing.T) { func TestGQLDBLoad(t * testing.T) {
ctx := logTestContext(t, []string{"thread", "update", "gql"}) ctx := logTestContext(t, []string{"thread", "signal", "gql", "test"})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{}) l1_r := NewSimpleLockable(RandID(), "Test Lockable 1")
fatalErr(t, err) l1 := &l1_r
t1, err := NewSimpleThread(ctx, "Test Thread 1", []Lockable{}, BaseThreadActions, BaseThreadHandlers) t1_r := NewSimpleThread(RandID(), "Test Thread 1", "init", nil, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err) t1 := &t1_r
update_channel := t1.UpdateChannel(10) update_channel := UpdateChannel(t1, 10, "test")
gql, err := NewGQLThread(ctx, ":0", []Lockable{l1}) gql_r := NewGQLThread(RandID(), "GQL Thread", "init", ":8080")
fatalErr(t, err) gql := &gql_r
info := NewGQLThreadInfo(true, "start", "restore") info := NewGQLThreadInfo(true, "start", "restore")
err = UpdateStates(ctx, []GraphNode{gql, t1}, func(nodes NodeMap) error { err := UpdateStates(ctx, []Node{gql, t1}, func(nodes NodeMap) error {
return LinkThreads(ctx, gql, t1, &info) err := LinkLockables(ctx, gql, []Lockable{l1}, nodes)
if err != nil {
return err
}
return LinkThreads(ctx, gql, t1, &info, nodes)
}) })
fatalErr(t, err) fatalErr(t, err)
err = UseStates(ctx, []GraphNode{gql}, func(states NodeStateMap) error {
SendUpdate(ctx, gql, NewSignal(t1, "child_added"), states) err = UseStates(ctx, []Node{gql}, func(nodes NodeMap) error {
SendUpdate(ctx, gql, AbortSignal(nil), states) err := gql.Signal(ctx, NewSignal(t1, "child_added"), nodes)
if err != nil {
return nil return nil
}
return gql.Signal(ctx, AbortSignal(nil), nodes)
}) })
fatalErr(t, err)
err = ThreadLoop(ctx, gql, "start") err = ThreadLoop(ctx, gql, "start")
if errors.Is(err, NewThreadAbortedError("")) { if errors.Is(err, NewThreadAbortedError("")) {
ctx.Log.Logf("test", "Main thread aborted by signal: %s", err) ctx.Log.Logf("test", "Main thread aborted by signal: %s", err)
@ -71,9 +81,9 @@ func TestGQLDBLoad(t * testing.T) {
(*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_aborted", t1, 100*time.Millisecond, "Didn't receive thread_abort from t1 on t1") (*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_aborted", t1, 100*time.Millisecond, "Didn't receive thread_abort from t1 on t1")
err = UseStates(ctx, []GraphNode{gql, t1}, func(states NodeStateMap) error { err = UseStates(ctx, []Node{gql, t1}, func(nodes NodeMap) error {
ser1, err := json.MarshalIndent(states[gql.ID()], "", " ") ser1, err := gql.Serialize()
ser2, err := json.MarshalIndent(states[t1.ID()], "", " ") ser2, err := t1.Serialize()
fmt.Printf("\n%s\n\n", ser1) fmt.Printf("\n%s\n\n", ser1)
fmt.Printf("\n%s\n\n", ser2) fmt.Printf("\n%s\n\n", ser2)
return err return err
@ -81,20 +91,21 @@ func TestGQLDBLoad(t * testing.T) {
gql_loaded, err := LoadNode(ctx, gql.ID()) gql_loaded, err := LoadNode(ctx, gql.ID())
fatalErr(t, err) fatalErr(t, err)
var t1_loaded *BaseThread = nil var t1_loaded *SimpleThread = nil
err = UseStates(ctx, []GraphNode{gql_loaded}, func(states NodeStateMap) error { var update_channel_2 chan GraphSignal
ser, err := json.MarshalIndent(states[gql_loaded.ID()], "", " ") err = UseStates(ctx, []Node{gql_loaded}, func(nodes NodeMap) error {
ser, err := gql_loaded.Serialize()
fmt.Printf("\n%s\n\n", ser) fmt.Printf("\n%s\n\n", ser)
child := states[gql_loaded.ID()].(ThreadState).Children()[0] child := gql_loaded.(Thread).Children()[0].(*SimpleThread)
t1_loaded = child.(*BaseThread) t1_loaded = child
update_channel = t1_loaded.UpdateChannel(10) update_channel_2 = UpdateChannel(t1_loaded, 10, "test")
err = UseMoreStates(ctx, []GraphNode{child}, states, func(states NodeStateMap) error { err = UseMoreStates(ctx, []Node{child}, nodes, func(nodes NodeMap) error {
ser, err := json.MarshalIndent(states[child.ID()], "", " ") ser, err := child.Serialize()
fmt.Printf("\n%s\n\n", ser) fmt.Printf("\n%s\n\n", ser)
return err return err
}) })
SendUpdate(ctx, gql_loaded, AbortSignal(nil), states) gql_loaded.Signal(ctx, AbortSignal(nil), nodes)
return err return err
}) })
@ -104,6 +115,6 @@ func TestGQLDBLoad(t * testing.T) {
} else { } else {
fatalErr(t, err) fatalErr(t, err)
} }
(*GraphTester)(t).WaitForValue(ctx, update_channel, "thread_aborted", t1_loaded, 100*time.Millisecond, "Dicn't received thread_aborted on t1_loaded from t1_loaded") (*GraphTester)(t).WaitForValue(ctx, update_channel_2, "thread_aborted", t1_loaded, 100*time.Millisecond, "Dicn't received thread_aborted on t1_loaded from t1_loaded")
} }

@ -27,6 +27,7 @@ type Lockable interface {
SetOwner(new_owner Lockable) SetOwner(new_owner Lockable)
//// State Reading Functions //// State Reading Functions
Name() string
// Called when new_owner wants to take lockable's lock but it's owned by this node // Called when new_owner wants to take lockable's lock but it's owned by this node
// A true return value means that the lock can be passed // A true return value means that the lock can be passed
AllowedToTakeLock(new_owner Lockable, lockable Lockable) bool AllowedToTakeLock(new_owner Lockable, lockable Lockable) bool

@ -145,7 +145,7 @@ func (thread * SimpleThread) AddChild(child Thread, info ThreadInfo) error {
return fmt.Errorf("nil info passed when expecting info") return fmt.Errorf("nil info passed when expecting info")
} else if info != nil { } else if info != nil {
if reflect.TypeOf(info) != thread.InfoType { if reflect.TypeOf(info) != thread.InfoType {
return fmt.Errorf("info type mismatch, expecting %+v", thread.InfoType) return fmt.Errorf("info type mismatch, expecting %+v - %+v", thread.InfoType, reflect.TypeOf(info))
} }
} }
@ -388,6 +388,7 @@ const THREAD_SIGNAL_BUFFER_SIZE = 128
func NewSimpleThread(id NodeID, name string, state_name string, info_type reflect.Type, actions ThreadActions, handlers ThreadHandlers) SimpleThread { func NewSimpleThread(id NodeID, name string, state_name string, info_type reflect.Type, actions ThreadActions, handlers ThreadHandlers) SimpleThread {
return SimpleThread{ return SimpleThread{
SimpleLockable: NewSimpleLockable(id, name), SimpleLockable: NewSimpleLockable(id, name),
InfoType: info_type,
state_name: state_name, state_name: state_name,
signal: make(chan GraphSignal, THREAD_SIGNAL_BUFFER_SIZE), signal: make(chan GraphSignal, THREAD_SIGNAL_BUFFER_SIZE),
children: []Thread{}, children: []Thread{},