Update a bunch of stuff

graph-rework-2
noah metz 2023-07-02 12:14:04 -06:00
parent 9b61adc375
commit c9d9b9ac03
7 changed files with 130 additions and 95 deletions

@ -387,7 +387,7 @@ func LoadGQLThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap) (
}
func LoadGQLThread(ctx * GraphContext, id NodeID) (GraphNode, error) {
thread := RestoreBaseThread(ctx, id)
thread := RestoreBaseThread(ctx, id, gql_actions, gql_handlers)
gql_thread := GQLThread{
BaseThread: thread,
http_server: nil,
@ -407,9 +407,13 @@ func NewGQLThreadState(listen string) GQLThreadState {
}
var gql_actions ThreadActions = ThreadActions{
"wait": ThreadWait,
"start": func(ctx * GraphContext, thread Thread) (string, error) {
ctx.Log.Logf("gql", "SERVER_STARTED")
server := thread.(*GQLThread)
server, ok := thread.(*GQLThread)
if ok == false {
panic(fmt.Sprintf("GQL_THREAD_START: %s is not GQLThread, %+v", thread.ID(), thread.State()))
}
// Serve the GQL http and ws handlers
mux := http.NewServeMux()

@ -8,14 +8,14 @@ import (
)
func TestGQLThread(t * testing.T) {
ctx := testContext(t)
ctx := logTestContext(t, []string{"thread"})
gql_thread, err := NewGQLThread(ctx, ":8080", []Lockable{})
fatalErr(t, err)
test_thread_1, err := NewSimpleBaseThread(ctx, "Test thread 1", []Lockable{}, ThreadActions{}, ThreadHandlers{})
test_thread_1, err := NewSimpleThread(ctx, "Test thread 1", []Lockable{}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err)
test_thread_2, err := NewSimpleBaseThread(ctx, "Test thread 2", []Lockable{}, ThreadActions{}, ThreadHandlers{})
test_thread_2, err := NewSimpleThread(ctx, "Test thread 2", []Lockable{}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err)
i1 := NewGQLThreadInfo(true)
@ -37,7 +37,7 @@ func TestGQLThread(t * testing.T) {
func TestGQLDBLoad(t * testing.T) {
ctx := logTestContext(t, []string{})
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
t1, err := NewGQLThread(ctx, ":8080", []Lockable{l1})

@ -206,13 +206,13 @@ func NewGraphContext(db * badger.DB, log Logger, state_loads StateLoadMap, node_
DB: db,
Log: log,
NodeLoadFuncs: NodeLoadMap{
"base_lockable": LoadBaseLockable,
"base_thread": LoadBaseThread,
"simple_lockable": LoadSimpleLockable,
"simple_thread": LoadSimpleThread,
"gql_thread": LoadGQLThread,
},
StateLoadFuncs: StateLoadMap{
"base_lockable": LoadBaseLockableState,
"base_thread": LoadBaseThreadState,
"simple_lockable": LoadSimpleLockableState,
"simple_thread": LoadSimpleThreadState,
"gql_thread": LoadGQLThreadState,
},
}

@ -547,7 +547,7 @@ func RestoreBaseLockable(ctx * GraphContext, id NodeID) BaseLockable {
}
}
func LoadBaseLockable(ctx * GraphContext, id NodeID) (GraphNode, error) {
func LoadSimpleLockable(ctx * GraphContext, id NodeID) (GraphNode, error) {
// call LoadNodeRecurse on any connected nodes to ensure they're loaded and return the id
lockable := RestoreBaseLockable(ctx, id)
return &lockable, nil
@ -622,7 +622,7 @@ func RestoreBaseLockableState(ctx * GraphContext, j BaseLockableStateJSON, loade
return &state, nil
}
func LoadBaseLockableState(ctx * GraphContext, data []byte, loaded_nodes NodeMap)(NodeState, error){
func LoadSimpleLockableState(ctx * GraphContext, data []byte, loaded_nodes NodeMap)(NodeState, error){
var j BaseLockableStateJSON
err := json.Unmarshal(data, &j)
if err != nil {
@ -637,8 +637,8 @@ func LoadBaseLockableState(ctx * GraphContext, data []byte, loaded_nodes NodeMap
return state, nil
}
func NewSimpleBaseLockable(ctx * GraphContext, name string, requirements []Lockable) (*BaseLockable, error) {
state := NewBaseLockableState(name, "base_lockable")
func NewSimpleLockable(ctx * GraphContext, name string, requirements []Lockable) (*BaseLockable, error) {
state := NewBaseLockableState(name, "simple_lockable")
lockable, err := NewBaseLockable(ctx, &state)
if err != nil {
return nil, err

@ -7,13 +7,13 @@ import (
"encoding/json"
)
func TestNewSimpleBaseLockable(t * testing.T) {
func TestNewSimpleLockable(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
l2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{l1})
l2, err := NewSimpleLockable(ctx, "Test lockable 2", []Lockable{l1})
fatalErr(t, err)
err = UseStates(ctx, []GraphNode{l1, l2}, func(states NodeStateMap) error {
@ -47,10 +47,10 @@ func TestNewSimpleBaseLockable(t * testing.T) {
func TestRepeatedChildLockable(t * testing.T) {
ctx := testContext(t)
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
r1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
_, err = NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{r1, r1})
_, err = NewSimpleLockable(ctx, "Test lockable 2", []Lockable{r1, r1})
if err == nil {
t.Fatal("Added the same lockable as a requirement twice to the same lockable")
}
@ -59,7 +59,7 @@ func TestRepeatedChildLockable(t * testing.T) {
func TestLockableSelfLock(t * testing.T) {
ctx := testContext(t)
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
r1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error {
@ -95,13 +95,13 @@ func TestLockableSelfLock(t * testing.T) {
func TestLockableSelfLockTiered(t * testing.T) {
ctx := testContext(t)
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
r1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{})
r2, err := NewSimpleLockable(ctx, "Test lockable 2", []Lockable{})
fatalErr(t, err)
r3, err := NewSimpleBaseLockable(ctx, "Test lockable 3", []Lockable{r1, r2})
r3, err := NewSimpleLockable(ctx, "Test lockable 3", []Lockable{r1, r2})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r3}, func(nodes NodeMap) error {
@ -152,10 +152,10 @@ func TestLockableSelfLockTiered(t * testing.T) {
func TestLockableLockOther(t * testing.T) {
ctx := testContext(t)
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
r1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{})
r2, err := NewSimpleLockable(ctx, "Test lockable 2", []Lockable{})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r1, r2}, func(nodes NodeMap) (error) {
@ -199,10 +199,10 @@ func TestLockableLockOther(t * testing.T) {
func TestLockableLockSimpleConflict(t * testing.T) {
ctx := testContext(t)
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
r1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{})
r2, err := NewSimpleLockable(ctx, "Test lockable 2", []Lockable{})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r1}, func(nodes NodeMap) error {
@ -251,13 +251,13 @@ func TestLockableLockSimpleConflict(t * testing.T) {
func TestLockableLockTieredConflict(t * testing.T) {
ctx := testContext(t)
r1, err := NewSimpleBaseLockable(ctx, "Test lockable 1", []Lockable{})
r1, err := NewSimpleLockable(ctx, "Test lockable 1", []Lockable{})
fatalErr(t, err)
r2, err := NewSimpleBaseLockable(ctx, "Test lockable 2", []Lockable{r1})
r2, err := NewSimpleLockable(ctx, "Test lockable 2", []Lockable{r1})
fatalErr(t, err)
r3, err := NewSimpleBaseLockable(ctx, "Test lockable 3", []Lockable{r1})
r3, err := NewSimpleLockable(ctx, "Test lockable 3", []Lockable{r1})
fatalErr(t, err)
err = UpdateStates(ctx, []GraphNode{r2}, func(nodes NodeMap) error {
@ -276,7 +276,7 @@ func TestLockableLockTieredConflict(t * testing.T) {
func TestLockableSimpleUpdate(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
update_channel := l1.UpdateChannel(0)
@ -291,13 +291,13 @@ func TestLockableSimpleUpdate(t * testing.T) {
func TestLockableDownUpdate(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
l2, err := NewSimpleBaseLockable(ctx, "Test Lockable 2", []Lockable{l1})
l2, err := NewSimpleLockable(ctx, "Test Lockable 2", []Lockable{l1})
fatalErr(t, err)
_, err = NewSimpleBaseLockable(ctx, "Test Lockable 3", []Lockable{l2})
_, err = NewSimpleLockable(ctx, "Test Lockable 3", []Lockable{l2})
fatalErr(t, err)
update_channel := l1.UpdateChannel(0)
@ -312,13 +312,13 @@ func TestLockableDownUpdate(t * testing.T) {
func TestLockableUpUpdate(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
l2, err := NewSimpleBaseLockable(ctx, "Test Lockable 2", []Lockable{l1})
l2, err := NewSimpleLockable(ctx, "Test Lockable 2", []Lockable{l1})
fatalErr(t, err)
l3, err := NewSimpleBaseLockable(ctx, "Test Lockable 3", []Lockable{l2})
l3, err := NewSimpleLockable(ctx, "Test Lockable 3", []Lockable{l2})
fatalErr(t, err)
update_channel := l3.UpdateChannel(0)
@ -333,10 +333,10 @@ func TestLockableUpUpdate(t * testing.T) {
func TestOwnerNotUpdatedTwice(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
l2, err := NewSimpleBaseLockable(ctx, "Test Lockable 2", []Lockable{l1})
l2, err := NewSimpleLockable(ctx, "Test Lockable 2", []Lockable{l1})
fatalErr(t, err)
update_channel := l2.UpdateChannel(0)
@ -351,11 +351,11 @@ func TestOwnerNotUpdatedTwice(t * testing.T) {
func TestLockableDependencyOverlap(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
l2, err := NewSimpleBaseLockable(ctx, "Test Lockable 2", []Lockable{l1})
l2, err := NewSimpleLockable(ctx, "Test Lockable 2", []Lockable{l1})
fatalErr(t, err)
_, err = NewSimpleBaseLockable(ctx, "Test Lockable 3", []Lockable{l1, l2})
_, err = NewSimpleLockable(ctx, "Test Lockable 3", []Lockable{l1, l2})
if err == nil {
t.Fatal("Should have thrown an error because of dependency overlap")
}
@ -363,17 +363,17 @@ func TestLockableDependencyOverlap(t * testing.T) {
func TestLockableDBLoad(t * testing.T){
ctx := logTestContext(t, []string{})
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
l2, err := NewSimpleBaseLockable(ctx, "Test Lockable 2", []Lockable{})
l2, err := NewSimpleLockable(ctx, "Test Lockable 2", []Lockable{})
fatalErr(t, err)
l3, err := NewSimpleBaseLockable(ctx, "Test Lockable 3", []Lockable{l1, l2})
l3, err := NewSimpleLockable(ctx, "Test Lockable 3", []Lockable{l1, l2})
fatalErr(t, err)
l4, err := NewSimpleBaseLockable(ctx, "Test Lockable 4", []Lockable{l3})
l4, err := NewSimpleLockable(ctx, "Test Lockable 4", []Lockable{l3})
fatalErr(t, err)
_, err = NewSimpleBaseLockable(ctx, "Test Lockable 5", []Lockable{l4})
_, err = NewSimpleLockable(ctx, "Test Lockable 5", []Lockable{l4})
fatalErr(t, err)
l6, err := NewSimpleBaseLockable(ctx, "Test Lockable 6", []Lockable{})
l6, err := NewSimpleLockable(ctx, "Test Lockable 6", []Lockable{})
err = UpdateStates(ctx, []GraphNode{l6, l3}, func(nodes NodeMap) error {
l6_state := l6.State().(LockableState)
err := LockLockables(ctx, []Lockable{l3}, l6, l6_state, nodes)

@ -58,6 +58,9 @@ type ThreadState interface {
AddChild(child Thread, info ThreadInfo) error
Start() error
Stop() error
TimeoutAction() string
SetTimeout(end_time time.Time, action string)
}
type BaseThreadState struct {
@ -67,6 +70,8 @@ type BaseThreadState struct {
child_info map[NodeID] ThreadInfo
InfoType reflect.Type
running bool
timeout time.Time
timeout_action string
}
type BaseThreadStateJSON struct {
@ -96,17 +101,19 @@ func SaveBaseThreadState(state * BaseThreadState) BaseThreadStateJSON {
}
}
func RestoreBaseThread(ctx * GraphContext, id NodeID) BaseThread {
func RestoreBaseThread(ctx * GraphContext, id NodeID, actions ThreadActions, handlers ThreadHandlers) BaseThread {
base_lockable := RestoreBaseLockable(ctx, id)
thread := BaseThread{
BaseLockable: base_lockable,
Actions: actions,
Handlers: handlers,
}
return thread
}
func LoadBaseThread(ctx * GraphContext, id NodeID) (GraphNode, error) {
thread := RestoreBaseThread(ctx, id)
func LoadSimpleThread(ctx * GraphContext, id NodeID) (GraphNode, error) {
thread := RestoreBaseThread(ctx, id, BaseThreadActions, BaseThreadHandlers)
return &thread, nil
}
@ -156,7 +163,7 @@ func RestoreBaseThreadState(ctx * GraphContext, j BaseThreadStateJSON, loaded_no
return &state, nil
}
func LoadBaseThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap)(NodeState, error){
func LoadSimpleThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap)(NodeState, error){
var j BaseThreadStateJSON
err := json.Unmarshal(data, &j)
if err != nil {
@ -171,6 +178,15 @@ func LoadBaseThreadState(ctx * GraphContext, data []byte, loaded_nodes NodeMap)(
return state, nil
}
func (state * BaseThreadState) SetTimeout(timeout time.Time, action string) {
state.timeout = timeout
state.timeout_action = action
}
func (state * BaseThreadState) TimeoutAction() string {
return state.timeout_action
}
func (state * BaseThreadState) MarshalJSON() ([]byte, error) {
thread_state := SaveBaseThreadState(state)
return json.Marshal(&thread_state)
@ -313,10 +329,8 @@ type Thread interface {
Action(action string) (ThreadAction, bool)
Handler(signal_type string) (ThreadHandler, bool)
SetTimeout(end_time time.Time, action string)
ClearTimeout()
Timeout() <-chan time.Time
TimeoutAction() string
ClearTimeout()
ChildWaits() *sync.WaitGroup
}
@ -455,9 +469,7 @@ type BaseThread struct {
Actions ThreadActions
Handlers ThreadHandlers
timeout <-chan time.Time
timeout_action string
timeout_chan <-chan time.Time
child_waits sync.WaitGroup
}
@ -491,22 +503,16 @@ func (thread * BaseThread) Handler(signal_type string) (ThreadHandler, bool) {
return handler, exists
}
func (thread * BaseThread) TimeoutAction() string {
return thread.timeout_action
}
func (thread * BaseThread) Timeout() <-chan time.Time {
return thread.timeout
return thread.timeout_chan
}
func (thread * BaseThread) ClearTimeout() {
thread.timeout_action = ""
thread.timeout = nil
thread.timeout_chan = nil
}
func (thread * BaseThread) SetTimeout(end_time time.Time, action string) {
thread.timeout_action = action
thread.timeout = time.After(time.Until(end_time))
func (thread * BaseThread) SetTimeout(timeout time.Time) {
thread.timeout_chan = time.After(time.Until(timeout))
}
var ThreadDefaultStart = func(ctx * GraphContext, thread Thread) (string, error) {
@ -532,8 +538,18 @@ var ThreadWait = func(ctx * GraphContext, thread Thread) (string, error) {
ctx.Log.Logf("thread", "THREAD_NOHANDLER: %s - %s", thread.ID(), signal.Type())
}
case <- thread.Timeout():
ctx.Log.Logf("thread", "THREAD_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), thread.TimeoutAction())
return thread.TimeoutAction(), nil
timeout_action := ""
err := UpdateStates(ctx, []GraphNode{thread}, func(nodes NodeMap) error {
thread_state := thread.State().(ThreadState)
timeout_action = thread_state.TimeoutAction()
thread.ClearTimeout()
return nil
})
if err != nil {
ctx.Log.Logf("thread", "THREAD_TIMEOUT_ERR: %s - %e", thread.ID(), err)
}
ctx.Log.Logf("thread", "THREAD_TIMEOUT %s - NEXT_STATE: %s", thread.ID(), timeout_action)
return timeout_action, nil
}
}
return "wait", nil
@ -553,9 +569,39 @@ func NewBaseThreadState(name string, _type string) BaseThreadState {
children: []Thread{},
child_info: map[NodeID]ThreadInfo{},
parent: nil,
timeout: time.Time{},
timeout_action: "wait",
}
}
func NewThreadActions() ThreadActions{
actions := ThreadActions{}
for k, v := range(BaseThreadActions) {
actions[k] = v
}
return actions
}
func NewThreadHandlers() ThreadHandlers{
handlers := ThreadHandlers{}
for k, v := range(BaseThreadHandlers) {
handlers[k] = v
}
return handlers
}
var BaseThreadActions = ThreadActions{
"wait": ThreadWait,
"start": ThreadDefaultStart,
}
var BaseThreadHandlers = ThreadHandlers{
"abort": ThreadAbort,
"cancel": ThreadCancel,
}
func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHandlers, state ThreadState) (BaseThread, error) {
lockable, err := NewBaseLockable(ctx, state)
if err != nil {
@ -564,31 +610,16 @@ func NewBaseThread(ctx * GraphContext, actions ThreadActions, handlers ThreadHan
thread := BaseThread{
BaseLockable: lockable,
Actions: ThreadActions{
"wait": ThreadWait,
"start": ThreadDefaultStart,
},
Handlers: ThreadHandlers{
"abort": ThreadAbort,
"cancel": ThreadCancel,
},
timeout: nil,
timeout_action: "",
}
for key, fn := range(actions) {
thread.Actions[key] = fn
}
for key, fn := range(handlers) {
thread.Handlers[key] = fn
Actions: actions,
Handlers: handlers,
}
return thread, nil
}
func NewSimpleBaseThread(ctx * GraphContext, name string, requirements []Lockable, actions ThreadActions, handlers ThreadHandlers) (* BaseThread, error) {
state := NewBaseThreadState(name, "base_thread")
func NewSimpleThread(ctx * GraphContext, name string, requirements []Lockable, actions ThreadActions, handlers ThreadHandlers) (* BaseThread, error) {
state := NewBaseThreadState(name, "simple_thread")
thread, err := NewBaseThread(ctx, actions, handlers, &state)
if err != nil {
return nil, err

@ -10,7 +10,7 @@ import (
func TestNewThread(t * testing.T) {
ctx := testContext(t)
t1, err := NewSimpleBaseThread(ctx, "Test thread 1", []Lockable{}, ThreadActions{}, ThreadHandlers{})
t1, err := NewSimpleThread(ctx, "Test thread 1", []Lockable{}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err)
go func(thread Thread) {
@ -33,10 +33,10 @@ func TestNewThread(t * testing.T) {
func TestThreadWithRequirement(t * testing.T) {
ctx := testContext(t)
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
t1, err := NewSimpleBaseThread(ctx, "Test Thread 1", []Lockable{l1}, ThreadActions{}, ThreadHandlers{})
t1, err := NewSimpleThread(ctx, "Test Thread 1", []Lockable{l1}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err)
go func (thread Thread) {
@ -60,10 +60,10 @@ func TestThreadWithRequirement(t * testing.T) {
func TestThreadDBLoad(t * testing.T) {
ctx := logTestContext(t, []string{})
l1, err := NewSimpleBaseLockable(ctx, "Test Lockable 1", []Lockable{})
l1, err := NewSimpleLockable(ctx, "Test Lockable 1", []Lockable{})
fatalErr(t, err)
t1, err := NewSimpleBaseThread(ctx, "Test Thread 1", []Lockable{l1}, ThreadActions{}, ThreadHandlers{})
t1, err := NewSimpleThread(ctx, "Test Thread 1", []Lockable{l1}, BaseThreadActions, BaseThreadHandlers)
fatalErr(t, err)