Got GQL subscriptions working for lockable_state

master
noah metz 2024-03-23 03:23:00 -06:00
parent 6850031e80
commit ab76f09923
4 changed files with 41 additions and 14 deletions

@ -1145,6 +1145,24 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
switch source := p.Source.(type) {
case *StatusSignal:
ctx.Context.Log.Logf("test", "StatusSignal: %+v", source)
cached_node, cached := ctx.NodeCache[source.Source]
ctx.Context.Log.Logf("test", "Cached: %t", cached)
if cached {
for ext_type, ext_changes := range(source.Changes) {
cached_ext, cached := cached_node.Data[ext_type]
if cached {
for _, field := range(ext_changes) {
delete(cached_ext, string(field))
}
cached_node.Data[ext_type] = cached_ext
}
}
ctx.NodeCache[source.Source] = cached_node
}
}
return ResolveNode(ctx.Server.ID, p) return ResolveNode(ctx.Server.ID, p)
}, },
}, },

@ -94,6 +94,7 @@ func GetResolveFields(id NodeID, ctx *ResolveContext, p graphql.ResolveParams) (
return m, nil return m, nil
} }
// TODO: instead of doing the read right away, check if any new fields need to be read
func ResolveNode(id NodeID, p graphql.ResolveParams) (NodeResult, error) { func ResolveNode(id NodeID, p graphql.ResolveParams) (NodeResult, error) {
ctx, err := PrepResolve(p) ctx, err := PrepResolve(p)
if err != nil { if err != nil {

@ -18,7 +18,7 @@ import (
func TestGQLSubscribe(t *testing.T) { func TestGQLSubscribe(t *testing.T) {
ctx := logTestContext(t, []string{"test", "listener", "changes"}) ctx := logTestContext(t, []string{"test"})
n1, err := NewNode(ctx, nil, "Lockable", 10, NewLockableExt(nil)) n1, err := NewNode(ctx, nil, "Lockable", 10, NewLockableExt(nil))
fatalErr(t, err) fatalErr(t, err)
@ -94,7 +94,7 @@ func TestGQLSubscribe(t *testing.T) {
n, err = ws.Read(resp) n, err = ws.Read(resp)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "SUB: %s", resp[:n]) ctx.Log.Logf("test", "SUB1: %s", resp[:n])
lock_id, err := LockLockable(ctx, gql) lock_id, err := LockLockable(ctx, gql)
fatalErr(t, err) fatalErr(t, err)
@ -111,7 +111,15 @@ func TestGQLSubscribe(t *testing.T) {
n, err = ws.Read(resp) n, err = ws.Read(resp)
fatalErr(t, err) fatalErr(t, err)
ctx.Log.Logf("test", "SUB: %s", resp[:n]) ctx.Log.Logf("test", "SUB2: %s", resp[:n])
n, err = ws.Read(resp)
fatalErr(t, err)
ctx.Log.Logf("test", "SUB3: %s", resp[:n])
n, err = ws.Read(resp)
fatalErr(t, err)
ctx.Log.Logf("test", "SUB4: %s", resp[:n])
// TODO: check that there are no more messages sent to ws within a timeout // TODO: check that there are no more messages sent to ws within a timeout
} }

@ -129,7 +129,7 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node
messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_owner")}) messages = append(messages, SendMsg{source, NewErrorSignal(signal.Id, "not_owner")})
} else { } else {
if len(ext.Requirements) == 0 { if len(ext.Requirements) == 0 {
changes = append(changes, "state", "owner", "pending_owner") changes = append(changes, "lockable_state", "owner", "pending_owner")
ext.Owner = nil ext.Owner = nil
@ -139,7 +139,7 @@ func (ext *LockableExt) HandleUnlockSignal(ctx *Context, node *Node, source Node
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)}) messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)})
} else { } else {
changes = append(changes, "state", "waiting", "requirements", "pending_owner") changes = append(changes, "lockable_state", "waiting", "requirements", "pending_owner")
ext.PendingOwner = nil ext.PendingOwner = nil
@ -173,7 +173,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
switch ext.State { switch ext.State {
case Unlocked: case Unlocked:
if len(ext.Requirements) == 0 { if len(ext.Requirements) == 0 {
changes = append(changes, "state", "owner", "pending_owner") changes = append(changes, "lockable_state", "owner", "pending_owner")
ext.Owner = new(NodeID) ext.Owner = new(NodeID)
*ext.Owner = source *ext.Owner = source
@ -184,7 +184,7 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, node *Node, source NodeID
ext.State = Locked ext.State = Locked
messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)}) messages = append(messages, SendMsg{source, NewSuccessSignal(signal.Id)})
} else { } else {
changes = append(changes, "state", "requirements", "waiting", "pending_owner") changes = append(changes, "lockable_state", "requirements", "waiting", "pending_owner")
ext.PendingOwner = new(NodeID) ext.PendingOwner = new(NodeID)
*ext.PendingOwner = source *ext.PendingOwner = source
@ -221,7 +221,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
switch ext.State { switch ext.State {
case Locking: case Locking:
changes = append(changes, "state", "requirements") changes = append(changes, "lockable_state", "requirements")
ext.Requirements[id] = Unlocked ext.Requirements[id] = Unlocked
@ -242,11 +242,11 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
} }
if unlocked == len(ext.Requirements) { if unlocked == len(ext.Requirements) {
changes = append(changes, "owner", "state") changes = append(changes, "owner", "lockable_state")
ext.State = Unlocked ext.State = Unlocked
ext.Owner = nil ext.Owner = nil
} else { } else {
changes = append(changes, "state") changes = append(changes, "lockable_state")
ext.State = AbortingLock ext.State = AbortingLock
} }
@ -271,7 +271,7 @@ func (ext *LockableExt) HandleErrorSignal(ctx *Context, node *Node, source NodeI
} }
if unlocked == len(ext.Requirements) { if unlocked == len(ext.Requirements) {
changes = append(changes, "owner", "state") changes = append(changes, "owner", "lockable_state")
ext.State = Unlocked ext.State = Unlocked
ext.Owner = nil ext.Owner = nil
} }
@ -309,7 +309,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
} }
if locked == len(ext.Requirements) { if locked == len(ext.Requirements) {
changes = append(changes, "state", "owner", "req_id") changes = append(changes, "lockable_state", "owner", "req_id")
ext.State = Locked ext.State = Locked
ext.Owner = new(NodeID) ext.Owner = new(NodeID)
@ -338,7 +338,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
} }
if unlocked == len(ext.Requirements) { if unlocked == len(ext.Requirements) {
changes = append(changes, "state", "pending_owner", "req_id") changes = append(changes, "lockable_state", "pending_owner", "req_id")
messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked: %s", ext.State)}) messages = append(messages, SendMsg{*ext.PendingOwner, NewErrorSignal(*ext.ReqID, "not_unlocked: %s", ext.State)})
ext.State = Unlocked ext.State = Unlocked
@ -359,7 +359,7 @@ func (ext *LockableExt) HandleSuccessSignal(ctx *Context, node *Node, source Nod
} }
if unlocked == len(ext.Requirements) { if unlocked == len(ext.Requirements) {
changes = append(changes, "state", "owner", "req_id") changes = append(changes, "lockable_state", "owner", "req_id")
messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)}) messages = append(messages, SendMsg{*ext.Owner, NewSuccessSignal(*ext.ReqID)})
ext.State = Unlocked ext.State = Unlocked