diff --git a/acl.go b/acl.go index 945ca60..c9c9665 100644 --- a/acl.go +++ b/acl.go @@ -33,14 +33,14 @@ func (signal ACLSignal) Permission() Tree { type ACLExt struct { Policies []Policy `gv:"policies"` PendingACLs map[uuid.UUID]PendingACL `gv:"pending_acls"` - Pending map[uuid.UUID]PendingSignal `gv:"pending"` + Pending map[uuid.UUID]PendingACLSignal `gv:"pending"` } func NewACLExt(policies []Policy) *ACLExt { return &ACLExt{ Policies: policies, PendingACLs: map[uuid.UUID]PendingACL{}, - Pending: map[uuid.UUID]PendingSignal{}, + Pending: map[uuid.UUID]PendingACLSignal{}, } } @@ -144,7 +144,7 @@ func (ext *ACLExt) Process(ctx *Context, node *Node, source NodeID, signal Signa total_messages += len(policy_messages) for _, message := range(policy_messages) { timeout_signal := NewTimeoutSignal(message.Signal.ID()) - ext.Pending[message.Signal.ID()] = PendingSignal{ + ext.Pending[message.Signal.ID()] = PendingACLSignal{ Policy: policy_id, Timeout: timeout_signal.Id, ID: sig.Id, diff --git a/context.go b/context.go index 0428872..63949f1 100644 --- a/context.go +++ b/context.go @@ -552,12 +552,12 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) { return nil, err } - pending_signal_type := reflect.TypeOf(PendingSignal{}) + pending_signal_type := reflect.TypeOf(PendingACLSignal{}) pending_signal_info, err := GetStructInfo(ctx, pending_signal_type) if err != nil { return nil, err } - err = ctx.RegisterType(pending_signal_type, PendingSignalType, nil, SerializeStruct(pending_signal_info), nil, DeserializeStruct(pending_signal_info)) + err = ctx.RegisterType(pending_signal_type, PendingACLSignalType, nil, SerializeStruct(pending_signal_info), nil, DeserializeStruct(pending_signal_info)) if err != nil { return nil, err } diff --git a/listener.go b/listener.go index 9fcbb76..2ee70fd 100644 --- a/listener.go +++ b/listener.go @@ -38,7 +38,7 @@ func (ext *ListenerExt) Process(ctx *Context, node *Node, source NodeID, signal } switch sig := signal.(type) { case *StatusSignal: - ctx.Log.Logf("listener_status", "STATUS: %s - %s", sig.Source, sig.Changes) + ctx.Log.Logf("listener_status", "%s - %+v", sig.Source, sig.Changes) } return nil, nil } diff --git a/lockable_test.go b/lockable_test.go index 7b9a938..8e8f235 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -75,7 +75,7 @@ func Test100Lock(t *testing.T) { fatalErr(t, err) listener_id := KeyID(l_pub) child_policy := NewPerNodePolicy(map[NodeID]Tree{ - listener_id: Tree{ + listener_id: { SerializedType(LockSignalType): nil, }, }) @@ -92,7 +92,7 @@ func Test100Lock(t *testing.T) { new_lockable := NewLockable() reqs[i] = new_lockable.ID } - ctx.Log.Logf("test", "CREATED_1K") + ctx.Log.Logf("test", "CREATED_100") l_policy := NewAllNodesPolicy(Tree{ SerializedType(LockSignalType): nil, @@ -109,10 +109,16 @@ func Test100Lock(t *testing.T) { lock_id, err := LockLockable(ctx, node) fatalErr(t, err) - _, _, err = WaitForResponse(listener.Chan, time.Second*60, lock_id) + response, _, err := WaitForResponse(listener.Chan, time.Second*60, lock_id) fatalErr(t, err) - ctx.Log.Logf("test", "LOCKED_1K") + switch resp := response.(type) { + case *SuccessSignal: + default: + t.Fatalf("Unexpected response to lock - %s", resp) + } + + ctx.Log.Logf("test", "LOCKED_100") } func TestLock(t *testing.T) { diff --git a/node.go b/node.go index 8f4b982..f6e01f3 100644 --- a/node.go +++ b/node.go @@ -79,7 +79,7 @@ type PendingACL struct { Source NodeID } -type PendingSignal struct { +type PendingACLSignal struct { Policy uuid.UUID Timeout uuid.UUID ID uuid.UUID @@ -91,11 +91,12 @@ type Node struct { Key ed25519.PrivateKey `gv:"key"` ID NodeID Type NodeType `gv:"type"` + // TODO: move each extension to it's own db key, and extend changes to notify which extension was changed Extensions map[ExtType]Extension `gv:"extensions"` Policies []Policy `gv:"policies"` PendingACLs map[uuid.UUID]PendingACL `gv:"pending_acls"` - PendingSignals map[uuid.UUID]PendingSignal `gv:"pending_signal"` + PendingACLSignals map[uuid.UUID]PendingACLSignal `gv:"pending_signal"` // Channel for this node to receive messages from the Context MsgChan chan *Message @@ -106,6 +107,8 @@ type Node struct { Active atomic.Bool + // TODO: enhance WriteNode to write SignalQueue to a different key, and use writeSignalQueue to decide whether or not to update it + writeSignalQueue bool SignalQueue []QueuedSignal `gv:"signal_queue"` NextSignal *QueuedSignal } @@ -181,6 +184,7 @@ func (node *Node) QueueTimeout(dest NodeID, signal Signal, timeout time.Duration func (node *Node) QueueSignal(time time.Time, signal Signal) { node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) + node.writeSignalQueue = true } func (node *Node) DequeueSignal(id uuid.UUID) error { @@ -198,16 +202,11 @@ func (node *Node) DequeueSignal(id uuid.UUID) error { node.SignalQueue[idx] = node.SignalQueue[len(node.SignalQueue)-1] node.SignalQueue = node.SignalQueue[:len(node.SignalQueue)-1] node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) + node.writeSignalQueue = true return nil } -func (node *Node) ClearSignalQueue() { - node.SignalQueue = []QueuedSignal{} - node.NextSignal = nil - node.TimeoutChan = nil -} - func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) { var soonest_signal *QueuedSignal var soonest_time time.Time @@ -359,7 +358,7 @@ func nodeLoop(ctx *Context, node *Node) error { msgs = append(msgs, m) timeout_signal := NewTimeoutSignal(m.Signal.ID()) node.QueueSignal(time.Now().Add(time.Second), timeout_signal) - node.PendingSignals[m.Signal.ID()] = PendingSignal{policy_type, timeout_signal.Id, msg.Signal.ID()} + node.PendingACLSignals[m.Signal.ID()] = PendingACLSignal{policy_type, timeout_signal.Id, msg.Signal.ID()} } } node.PendingACLs[msg.Signal.ID()] = PendingACL{ @@ -404,6 +403,8 @@ func nodeLoop(ctx *Context, node *Node) error { node.SignalQueue = node.SignalQueue[:(l-1)] node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) + node.writeSignalQueue = true + if node.NextSignal == nil { ctx.Log.Logf("node", "NODE_TIMEOUT(%s) - PROCESSING %+v@%s - NEXT_SIGNAL nil@%+v", node.ID, signal, t, node.TimeoutChan) } else { @@ -420,9 +421,9 @@ func nodeLoop(ctx *Context, node *Node) error { response, ok := signal.(ResponseSignal) if ok == true { - info, waiting := node.PendingSignals[response.ResponseID()] + info, waiting := node.PendingACLSignals[response.ResponseID()] if waiting == true { - delete(node.PendingSignals, response.ResponseID()) + delete(node.PendingACLSignals, response.ResponseID()) ctx.Log.Logf("pending", "FOUND_PENDING_SIGNAL: %s - %s", node.ID, signal) req_info, exists := node.PendingACLs[info.ID] @@ -653,7 +654,7 @@ func NewNode(ctx *Context, key ed25519.PrivateKey, node_type NodeType, buffer_si Extensions: ext_map, Policies: policies, PendingACLs: map[uuid.UUID]PendingACL{}, - PendingSignals: map[uuid.UUID]PendingSignal{}, + PendingACLSignals: map[uuid.UUID]PendingACLSignal{}, MsgChan: make(chan *Message, buffer_size), BufferSize: buffer_size, SignalQueue: []QueuedSignal{}, diff --git a/serialize.go b/serialize.go index a93e737..4927b0a 100644 --- a/serialize.go +++ b/serialize.go @@ -270,7 +270,7 @@ var ( NodeIDType = NewSerializedType("NODE_ID") UUIDType = NewSerializedType("UUID") PendingACLType = NewSerializedType("PENDING_ACL") - PendingSignalType = NewSerializedType("PENDING_SIGNAL") + PendingACLSignalType = NewSerializedType("PENDING_ACL_SIGNAL") TimeType = NewSerializedType("TIME") DurationType = NewSerializedType("DURATION") ResponseType = NewSerializedType("RESPONSE")