diff --git a/gql.go b/gql.go index 2cb4158..0ba4777 100644 --- a/gql.go +++ b/gql.go @@ -804,6 +804,10 @@ func (ext *GQLExt) NewSubscriptionChannel(buffer int) chan Signal { } func (ext *GQLExt) Process(context *Context, princ_id NodeID, node *Node, signal Signal) { + if signal.Type() == ReadResultSignalType { + } + + ext.SubscribeLock.Lock() defer ext.SubscribeLock.Unlock() diff --git a/graph_test.go b/graph_test.go index dc620ff..6e58216 100644 --- a/graph_test.go +++ b/graph_test.go @@ -13,7 +13,33 @@ import ( type GraphTester testing.T const listner_timeout = 50 * time.Millisecond -func (t * GraphTester) WaitForState(ctx * Context, listener *ListenerExt, stype SignalType, state string, timeout time.Duration, str string) Signal { +func (t *GraphTester) WaitForReadResult(ctx *Context, listener *ListenerExt, timeout time.Duration, str string) map[ExtType]map[string]interface{} { + timeout_channel := time.After(timeout) + for true { + select { + case signal := <- listener.Chan: + ctx.Log.Logf("test", "SIGNAL %+v", signal) + if signal == nil { + ctx.Log.Logf("test", "SIGNAL_CHANNEL_CLOSED: %s", listener) + t.Fatal(str) + } + if signal.Type() == ReadResultSignalType { + result_signal, ok := signal.(ReadResultSignal) + if ok == false { + ctx.Log.Logf("test", "SIGNAL_CHANNEL_BAD_CAST: %+v", signal) + t.Fatal(str) + } + return result_signal.Extensions + } + case <-timeout_channel: + ctx.Log.Logf("test", "SIGNAL_CHANNEL_TIMEOUT: %+v", listener) + t.Fatal(str) + } + } + return nil +} + +func (t *GraphTester) WaitForState(ctx * Context, listener *ListenerExt, stype SignalType, state string, timeout time.Duration, str string) Signal { timeout_channel := time.After(timeout) for true { select { diff --git a/lockable_test.go b/lockable_test.go index d931d2c..c9a159f 100644 --- a/lockable_test.go +++ b/lockable_test.go @@ -21,7 +21,7 @@ var link_policy = NewAllNodesPolicy(Actions{MakeAction(LinkSignalType, "*"), Mak var lock_policy = NewAllNodesPolicy(Actions{MakeAction(LockSignalType, "*")}) func TestLink(t *testing.T) { - ctx := lockableTestContext(t, []string{"lockable"}) + ctx := lockableTestContext(t, []string{}) l1_listener := NewListenerExt(10) l1 := NewNode(ctx, RandID(), TestLockableType, nil, @@ -51,7 +51,7 @@ func TestLink(t *testing.T) { } func TestLock(t *testing.T) { - ctx := lockableTestContext(t, []string{"policy"}) + ctx := lockableTestContext(t, []string{}) NewLockable := func()(*Node, *ListenerExt) { listener := NewListenerExt(10) diff --git a/node.go b/node.go index 89c098d..cdc9128 100644 --- a/node.go +++ b/node.go @@ -160,6 +160,7 @@ func ReadNodeFields(ctx *Context, self *Node, princ NodeID, reqs map[ExtType][]s } } } + exts[ext_type] = fields } return exts } @@ -198,8 +199,8 @@ func nodeLoop(ctx *Context, node *Node) error { if ok == false { ctx.Log.Logf("signal", "READ_SIGNAL: bad cast %+v", signal) } else { - fields := ReadNodeFields(ctx, node, source, read_signal.Extensions) - ctx.Log.Logf("test", "READ_RESULT: %+v", fields) + result := ReadNodeFields(ctx, node, source, read_signal.Extensions) + ctx.Send(node.ID, source, NewReadResultSignal(result)) } } @@ -341,19 +342,27 @@ func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []Queue } func Allowed(ctx *Context, principal_id NodeID, action Action, node *Node) error { - ctx.Log.Logf("policy", "POLICY_CHECK: %s %s.%s", principal_id, node.ID, action) + ctx.Log.Logf("policy", "POLICY_CHECK: %s -> %s.%s", principal_id, node.ID, action) // Nodes are allowed to perform all actions on themselves regardless of whether or not they have an ACL extension if principal_id == node.ID { + ctx.Log.Logf("policy", "POLICY_CHECK_SAME_NODE: %s.%s", principal_id, action) return nil } // Check if the node has a policy extension itself, and check against the policies in it policy_ext, err := GetExt[*ACLExt](node) if err != nil { + ctx.Log.Logf("policy", "POLICY_CHECK_NO_ACL_EXT: %s", node.ID) return err } - return policy_ext.Allows(ctx, principal_id, action, node) + err = policy_ext.Allows(ctx, principal_id, action, node) + if err != nil { + ctx.Log.Logf("policy", "POLICY_CHECK_FAIL: %s -> %s.%s : %s", principal_id, node.ID, action, err) + } else { + ctx.Log.Logf("policy", "POLICY_CHECK_PASS: %s -> %s.%s", principal_id, node.ID, action) + } + return err } // Magic first four bytes of serialized DB content, stored big endian diff --git a/node_test.go b/node_test.go index f0f9e9c..f776fa2 100644 --- a/node_test.go +++ b/node_test.go @@ -2,10 +2,11 @@ package graphvent import ( "testing" + "time" ) func TestNodeDB(t *testing.T) { - ctx := logTestContext(t, []string{"db"}) + ctx := logTestContext(t, []string{}) node_type := NodeType("test") err := ctx.RegisterNodeType(node_type, []ExtType{GroupExtType}) fatalErr(t, err) @@ -16,3 +17,34 @@ func TestNodeDB(t *testing.T) { _, err = ctx.GetNode(node.ID) fatalErr(t, err) } + +func TestNodeRead(t *testing.T) { + ctx := logTestContext(t, []string{"test", "read", "signal", "policy", "node", "loop"}) + node_type := NodeType("TEST") + err := ctx.RegisterNodeType(node_type, []ExtType{ACLExtType, GroupExtType}) + fatalErr(t, err) + + n1_id := RandID() + n2_id := RandID() + + ctx.Log.Logf("test", "N1: %s", n1_id) + ctx.Log.Logf("test", "N2: %s", n2_id) + + n2_policy := NewPerNodePolicy(map[NodeID]Actions{ + n1_id: Actions{MakeAction(ReadResultSignalType, "+")}, + }) + n2_listener := NewListenerExt(10) + n2 := NewNode(ctx, n2_id, node_type, nil, NewACLExt(n2_policy), NewGroupExt(nil), n2_listener) + + n1_policy := NewPerNodePolicy(map[NodeID]Actions{ + n2_id: Actions{MakeAction(ReadSignalType, "+")}, + }) + n1 := NewNode(ctx, n1_id, node_type, nil, NewACLExt(n1_policy), NewGroupExt(nil)) + + ctx.Send(n2.ID, n1.ID, NewReadSignal(map[ExtType][]string{ + GroupExtType: []string{"members"}, + })) + + res := (*GraphTester)(t).WaitForReadResult(ctx, n2_listener, 10*time.Millisecond, "No read_result") + ctx.Log.Logf("test", "READ_RESULT: %+v", res) +} diff --git a/policy.go b/policy.go index ab43a0f..ebf88ae 100644 --- a/policy.go +++ b/policy.go @@ -122,8 +122,11 @@ func MakeAction(parts ...interface{}) Action { } func (action Action) Allows(test Action) bool { + action_len := len(action) for i, part := range(test) { - if action[i] == part || action[i] == "*" { + if i >= action_len { + return false + } else if action[i] == part || action[i] == "*" { continue } else if action[i] == "+" { break @@ -148,6 +151,33 @@ func (actions Actions) Allows(action Action) error { type NodeActions map[NodeID]Actions +func (actions NodeActions) MarshalJSON() ([]byte, error) { + tmp := map[string]Actions{} + for id, a := range(actions) { + tmp[id.String()] = a + } + return json.Marshal(tmp) +} + +func (actions NodeActions) UnmarshalJSON(data []byte) error { + tmp := map[string]Actions{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + + for id_str, a := range(tmp) { + id, err := ParseID(id_str) + if err != nil { + return err + } + + actions[id] = a + } + + return nil +} + type AllNodesPolicyJSON struct { Actions Actions `json:"actions"` } @@ -350,7 +380,6 @@ func (ext *ACLExt) Type() ExtType { // Check if the extension allows the principal to perform action on node func (ext *ACLExt) Allows(ctx *Context, principal_id NodeID, action Action, node *Node) error { - ctx.Log.Logf("policy", "POLICY_EXT_ALLOWED: %+v", ext) errs := []error{} for _, policy := range(ext.Policies) { err := policy.Allows(principal_id, action, node) diff --git a/signal.go b/signal.go index 2c8e52c..971d1e0 100644 --- a/signal.go +++ b/signal.go @@ -10,6 +10,7 @@ const ( LinkSignalType = SignalType("LINK") LockSignalType = SignalType("LOCK") ReadSignalType = SignalType("READ") + ReadResultSignalType = SignalType("READ_RESULT") ) type SignalDirection int @@ -156,3 +157,15 @@ func NewReadSignal(exts map[ExtType][]string) ReadSignal { Extensions: exts, } } + +type ReadResultSignal struct { + BaseSignal + Extensions map[ExtType]map[string]interface{} `json:"extensions"` +} + +func NewReadResultSignal(exts map[ExtType]map[string]interface{}) ReadResultSignal { + return ReadResultSignal{ + BaseSignal: NewDirectSignal(ReadResultSignalType), + Extensions: exts, + } +}