Rework of graph.go and resource.go for state

graph-rework
noah metz 2023-06-23 10:10:25 -06:00
parent 5cd741b42e
commit 2de5276ecc
6 changed files with 735 additions and 2568 deletions

@ -1,607 +0,0 @@
package graphvent
import (
"fmt"
"time"
"errors"
"reflect"
"sort"
"sync"
badger "github.com/dgraph-io/badger/v3"
)
// Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) PropagateUpdate(signal GraphSignal) {
event.state_lock.RLock()
defer event.state_lock.RUnlock()
state := event.state.(*EventState)
if signal.Downwards() == false {
// Child->Parent
if state.parent != nil {
SendUpdate(state.parent, signal)
}
for _, resource := range(state.resources) {
SendUpdate(resource, signal)
}
} else {
// Parent->Child
for _, child := range(state.children) {
SendUpdate(child, signal)
}
}
event.signal <- signal
}
type EventInfo interface {
}
type BaseEventInfo interface {
EventInfo
}
type EventQueueInfo struct {
EventInfo
priority int
state string
}
func NewEventQueueInfo(priority int) * EventQueueInfo {
info := &EventQueueInfo{
priority: priority,
state: "queued",
}
return info
}
// Event is the interface that event tree nodes must implement
type Event interface {
GraphNode
Children() []Event
LockChildren()
UnlockChildren()
InfoType() reflect.Type
LockInfo()
UnlockInfo()
ChildInfo(event Event) EventInfo
Parent() Event
LockParent()
UnlockParent()
Action(action string) (func()(string, error), bool)
Handler(signal_type string) (func(GraphSignal) (string, error), bool)
Resources() []Resource
Resource(id string) Resource
AddResource(Resource) error
DoneResource() Resource
SetTimeout(end_time time.Time, action string)
ClearTimeout()
Timeout() <-chan time.Time
TimeoutAction() string
Signal() chan GraphSignal
finish() error
addChild(child Event, info EventInfo)
setParent(parent Event)
}
func (event * BaseEvent) AddResource(resource Resource) error {
event.state_lock.Lock()
defer event.state_lock.Unlock()
state := event.state.(*EventState)
_, exists := state.resources[resource.ID()]
if exists == true {
return fmt.Errorf("%s is already required for %s, cannot add again", resource.Name(), state.name)
}
state.resources[resource.ID()] = resource
return nil
}
func (event * BaseEvent) Signal() chan GraphSignal {
return event.signal
}
func (event * BaseEvent) TimeoutAction() string {
return event.timeout_action
}
func (event * BaseEvent) Timeout() <-chan time.Time {
return event.timeout
}
func (event * BaseEvent) ClearTimeout() {
event.timeout_action = ""
event.timeout = nil
}
func (event * BaseEvent) SetTimeout(end_time time.Time, action string) {
event.timeout_action = action
event.timeout = time.After(time.Until(end_time))
}
func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) {
handler, exists := event.Handlers[signal_type]
return handler, exists
}
func FindChild(event Event, id string) Event {
if id == event.ID() {
return event
}
for _, child := range event.Children() {
result := FindChild(child, id)
if result != nil {
return result
}
}
return nil
}
func CheckInfoType(event Event, info EventInfo) bool {
if event.InfoType() == nil || info == nil {
if event.InfoType() == nil && info == nil {
return true
} else {
return false
}
}
return event.InfoType() == reflect.TypeOf(info)
}
func LinkEvent(event Event, child Event, info EventInfo) error {
if CheckInfoType(event, info) == false {
return errors.New("LinkEvents got wrong type")
}
event.LockParent()
child.LockParent()
if child.Parent() != nil {
child.UnlockParent()
event.UnlockParent()
return errors.New(fmt.Sprintf("Parent already registered: %s->%s already %s", child.Name(), event.Name(), child.Parent().Name()))
}
event.LockChildren()
for _, c := range(event.Children()) {
if c.ID() == child.ID() {
event.UnlockChildren()
child.UnlockParent()
event.UnlockParent()
return errors.New("Child already in event")
}
}
// After all the checks are done, update the state of child + parent, then unlock and update
child.setParent(event)
event.addChild(child, info)
event.UnlockChildren()
child.UnlockParent()
event.UnlockParent()
SendUpdate(event, NewSignal(event, "child_added"))
return nil
}
func RunEvent(event Event) error {
Log.Logf("event", "EVENT_RUN: %s", event.Name())
for _, resource := range(event.Resources()) {
if resource.Owner() == nil {
return fmt.Errorf("EVENT_RUN_RESOURCE_NOT_LOCKED: %s, %s", event.Name(), resource.Name())
} else if resource.Owner().ID() != event.ID() {
return fmt.Errorf("EVENT_RUN_RESOURCE_ALREADY_LOCKED: %s, %s, %s", event.Name(), resource.Name(), resource.Owner().Name())
}
}
SendUpdate(event, NewSignal(event, "event_start"))
next_action := "start"
var err error = nil
for next_action != "" {
action, exists := event.Action(next_action)
if exists == false {
error_str := fmt.Sprintf("%s is not a valid action", next_action)
return errors.New(error_str)
}
Log.Logf("event", "EVENT_ACTION: %s - %s", event.Name(), next_action)
next_action, err = action()
if err != nil {
return err
}
}
err = FinishEvent(event)
if err != nil {
Log.Logf("event", "EVENT_RUN_FINISH_ERR: %s", err)
return err
}
Log.Logf("event", "EVENT_RUN_DONE: %s", event.Name())
return nil
}
func EventAbort(event Event) func(signal GraphSignal) (string, error) {
return func(signal GraphSignal) (string, error) {
return "", errors.New(fmt.Sprintf("%s aborted by signal", event.ID()))
}
}
func EventCancel(event Event) func(signal GraphSignal) (string, error) {
return func(signal GraphSignal) (string, error) {
return "", nil
}
}
func LockResources(event Event) error {
Log.Logf("event", "RESOURCE_LOCKING for %s - %+v", event.Name(), event.Resources())
locked_resources := []Resource{}
var lock_err error = nil
for _, resource := range(event.Resources()) {
err := LockResource(resource, event)
if err != nil {
lock_err = err
break
}
locked_resources = append(locked_resources, resource)
}
if lock_err != nil {
for _, resource := range(locked_resources) {
UnlockResource(resource, event)
}
Log.Logf("event", "RESOURCE_LOCK_FAIL for %s: %s", event.Name(), lock_err)
return lock_err
}
Log.Logf("event", "RESOURCE_LOCK_SUCCESS for %s", event.Name())
signal := NewDownSignal(event, "locked")
SendUpdate(event, signal)
return nil
}
func FinishEvent(event Event) error {
Log.Logf("event", "EVENT_FINISH: %s", event.Name())
for _, resource := range(event.Resources()) {
err := UnlockResource(resource, event)
if err != nil {
panic(err)
}
}
err := UnlockResource(event.DoneResource(), event)
if err != nil {
return err
}
SendUpdate(event, NewDownSignal(event, "unlocked"))
SendUpdate(event.DoneResource(), NewDownSignal(event, "unlocked"))
err = event.finish()
if err != nil {
return err
}
SendUpdate(event, NewSignal(event, "event_done"))
return nil
}
// BaseEvent is the most basic event that can exist in the event tree.
// On start it automatically transitions to completion.
// It can optionally require events, which will all need to be locked to start it
// It can optionally create resources, which will be locked by default and unlocked on completion
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
resources_lock sync.Mutex
children_lock sync.Mutex
info_lock sync.Mutex
parent_lock sync.Mutex
Actions map[string]func() (string, error)
Handlers map[string]func(GraphSignal) (string, error)
timeout <-chan time.Time
timeout_action string
}
type EventState struct {
BaseNodeState
children []Event
child_info map[string]EventInfo
resources map[string]Resource
parent Event
}
func (event * BaseEvent) LockInfo() {
event.info_lock.Lock()
}
func (event * BaseEvent) UnlockInfo() {
event.info_lock.Unlock()
}
func (event * BaseEvent) Action(action string) (func() (string, error), bool) {
action_fn, exists := event.Actions[action]
return action_fn, exists
}
func EventWait(event Event) (func() (string, error)) {
return func() (string, error) {
Log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout())
select {
case signal := <- event.Signal():
Log.Logf("event", "EVENT_SIGNAL: %s %+v", event.Name(), signal)
signal_fn, exists := event.Handler(signal.Type())
if exists == true {
Log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type())
return signal_fn(signal)
}
return "wait", nil
case <- event.Timeout():
Log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction())
return event.TimeoutAction(), nil
}
}
}
func NewBaseEvent(name string, description string) (BaseEvent) {
event := BaseEvent{
BaseNode: NewBaseNode(randid()),
Actions: map[string]func()(string, error){},
Handlers: map[string]func(GraphSignal)(string, error){},
timeout: nil,
timeout_action: "",
}
return event
}
func AddResources(event Event, resources []Resource) error {
for _, r := range(resources) {
err := event.AddResource(r)
if err != nil {
return err
}
}
return nil
}
func NewEventState(name string, description string) *EventState{
return &EventState{
BaseNodeState: BaseNodeState{
name: name,
description: description,
delegation_map: map[string]GraphNode{},
},
children: []Event{},
child_info: map[string]EventInfo{},
resources: map[string]Resource{},
parent: nil,
}
}
func NewEvent(db *badger.DB, name string, description string, resources []Resource) (* BaseEvent, error) {
event := NewBaseEvent(name, description)
event_ptr := &event
event_ptr.state = NewEventState(name, description)
err := AddResources(event_ptr, resources)
if err != nil {
return nil, err
}
event_ptr.Actions["wait"] = EventWait(event_ptr)
event_ptr.Handlers["abort"] = EventAbort(event_ptr)
event_ptr.Handlers["cancel"] = EventCancel(event_ptr)
event_ptr.Actions["start"] = func() (string, error) {
return "", nil
}
return event_ptr, nil
}
func (event * BaseEvent) finish() error {
return nil
}
func (event * BaseEvent) InfoType() reflect.Type {
return nil
}
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
listened_resources map[string]Resource
queue_lock sync.Mutex
}
func (queue * EventQueue) finish() error {
for _, resource := range(queue.listened_resources) {
resource.UnregisterChannel(queue.signal)
}
return nil
}
func (queue * EventQueue) InfoType() reflect.Type {
return reflect.TypeOf((*EventQueueInfo)(nil))
}
func NewEventQueue(name string, description string, resources []Resource) (* EventQueue, error) {
queue := &EventQueue{
BaseEvent: NewBaseEvent(name, description),
listened_resources: map[string]Resource{},
}
queue.state = NewEventState(name, description)
AddResources(queue, resources)
queue.Actions["wait"] = EventWait(queue)
queue.Handlers["abort"] = EventAbort(queue)
queue.Handlers["cancel"] = EventCancel(queue)
queue.Actions["start"] = func() (string, error) {
return "queue_event", nil
}
queue.Actions["queue_event"] = func() (string, error) {
// Copy the events to sort the list
queue.LockChildren()
copied_events := make([]Event, len(queue.Children()))
copy(copied_events, queue.Children())
less := func(i int, j int) bool {
info_i := queue.ChildInfo(copied_events[i]).(*EventQueueInfo)
info_j := queue.ChildInfo(copied_events[j]).(*EventQueueInfo)
return info_i.priority < info_j.priority
}
sort.SliceStable(copied_events, less)
needed_resources := map[string]Resource{}
for _, event := range(copied_events) {
// make sure all the required resources are registered to update the event
for _, resource := range(event.Resources()) {
needed_resources[resource.ID()] = resource
}
info := queue.ChildInfo(event).(*EventQueueInfo)
event.LockInfo()
defer event.UnlockInfo()
if info.state == "queued" {
err := LockResources(event)
// start in new goroutine
if err != nil {
} else {
info.state = "running"
Log.Logf("event", "EVENT_START: %s", event.Name())
go func(event Event, info * EventQueueInfo, queue Event) {
Log.Logf("event", "EVENT_GOROUTINE: %s", event.Name())
err := RunEvent(event)
if err != nil {
Log.Logf("event", "EVENT_ERROR: %s", err)
}
event.LockInfo()
defer event.UnlockInfo()
info.state = "done"
}(event, info, queue)
}
}
}
for _, resource := range(needed_resources) {
_, exists := queue.listened_resources[resource.ID()]
if exists == false {
Log.Logf("event", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name())
queue.listened_resources[resource.ID()] = resource
resource.RegisterChannel(queue.signal)
}
}
queue.UnlockChildren()
return "wait", nil
}
queue.Handlers["resource_connected"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
queue.Handlers["child_added"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
queue.Handlers["lock_changed"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
queue.Handlers["event_done"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
return queue, nil
}
func (event * BaseEvent) Allowed() []GraphNode {
event.state_lock.RLock()
defer event.state_lock.RUnlock()
state := event.state.(*EventState)
ret := make([]GraphNode, len(state.children))
for i, v := range(state.children) {
ret[i] = v
}
return ret
}
func (event * BaseEvent) Resources() []Resource {
resources := []Resource{}
for _, val := range(event.resources) {
resources = append(resources, val)
}
return resources
}
func (event * BaseEvent) Resource(id string) Resource {
resource, _ := event.resources[id]
return resource
}
func (event * BaseEvent) DoneResource() Resource {
return event.done_resource
}
func (event * BaseEvent) Children() []Event {
return event.children
}
func (event * BaseEvent) ChildInfo(idx Event) EventInfo {
val, ok := event.child_info[idx.ID()]
if ok == false {
return nil
}
return val
}
func (event * BaseEvent) LockChildren() {
event.children_lock.Lock()
}
func (event * BaseEvent) UnlockChildren() {
event.children_lock.Unlock()
}
func (event * BaseEvent) LockParent() {
event.parent_lock.Lock()
}
func (event * BaseEvent) UnlockParent() {
event.parent_lock.Unlock()
}
func (event * BaseEvent) setParent(parent Event) {
event.parent = parent
}
func (event * BaseEvent) addChild(child Event, info EventInfo) {
event.children = append(event.children, child)
event.child_info[child.ID()] = info
}
type GQLEvent struct {
BaseEvent
abort chan error
}

1150
gql.go

File diff suppressed because it is too large Load Diff

@ -2,31 +2,53 @@ package graphvent
import ( import (
"sync" "sync"
"reflect"
"github.com/google/uuid" "github.com/google/uuid"
"time"
"os" "os"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"fmt" "fmt"
"encoding/json" badger "github.com/dgraph-io/badger/v3"
) )
type GraphContext struct {
DB * badger.DB
Log Logger
}
func NewGraphContext(db * badger.DB, log Logger) * GraphContext {
return &GraphContext{DB: db, Log: log}
}
// A Logger is passed around to record events happening to components enabled by SetComponents
type Logger interface { type Logger interface {
Init() error SetComponents(components []string) error
// Log a formatted string
Logf(component string, format string, items ... interface{}) Logf(component string, format string, items ... interface{})
// Log a map of attributes and a format string
Logm(component string, fields map[string]interface{}, format string, items ... interface{}) Logm(component string, fields map[string]interface{}, format string, items ... interface{})
} }
type DefaultLogger struct { func NewConsoleLogger(components []string) *ConsoleLogger {
init_lock sync.Mutex logger := &ConsoleLogger{
Loggers map[string]zerolog.Logger loggers: map[string]zerolog.Logger{},
Components []string components: []string{},
}
logger.SetComponents(components)
return logger
} }
var Log DefaultLogger = DefaultLogger{Loggers: map[string]zerolog.Logger{}, Components: []string{}} // A ConsoleLogger logs to stdout
type ConsoleLogger struct {
loggers map[string]zerolog.Logger
components_lock sync.Mutex
components []string
}
func (logger * DefaultLogger) Init(components []string) error { func (logger * ConsoleLogger) SetComponents(components []string) error {
logger.init_lock.Lock() logger.components_lock.Lock()
defer logger.init_lock.Unlock() defer logger.components_lock.Unlock()
component_enabled := func (component string) bool { component_enabled := func (component string) bool {
for _, c := range(components) { for _, c := range(components) {
@ -37,23 +59,23 @@ func (logger * DefaultLogger) Init(components []string) error {
return false return false
} }
for c, _ := range(logger.Loggers) { for c, _ := range(logger.loggers) {
if component_enabled(c) == false { if component_enabled(c) == false {
delete(logger.Loggers, c) delete(logger.loggers, c)
} }
} }
for _, c := range(components) { for _, c := range(components) {
_, exists := logger.Loggers[c] _, exists := logger.loggers[c]
if component_enabled(c) == true && exists == false { if component_enabled(c) == true && exists == false {
logger.Loggers[c] = zerolog.New(os.Stdout).With().Timestamp().Str("component", c).Logger() logger.loggers[c] = zerolog.New(os.Stdout).With().Timestamp().Str("component", c).Logger()
} }
} }
return nil return nil
} }
func (logger * DefaultLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { func (logger * ConsoleLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
l, exists := logger.Loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
log := l.Log() log := l.Log()
for key, value := range(fields) { for key, value := range(fields) {
@ -63,37 +85,48 @@ func (logger * DefaultLogger) Logm(component string, fields map[string]interface
} }
} }
func (logger * DefaultLogger) Logf(component string, format string, items ... interface{}) { func (logger * ConsoleLogger) Logf(component string, format string, items ... interface{}) {
l, exists := logger.Loggers[component] l, exists := logger.loggers[component]
if exists == true { if exists == true {
l.Log().Msg(fmt.Sprintf(format, items...)) l.Log().Msg(fmt.Sprintf(format, items...))
} }
} }
// Generate a random graphql id type NodeID string
func randid() string{ // Generate a random id
func RandID() NodeID {
uuid_str := uuid.New().String() uuid_str := uuid.New().String()
return uuid_str return NodeID(uuid_str)
} }
type SignalDirection int
const (
Up SignalDirection = iota
Down
Direct
)
// GraphSignals are passed around the event tree/resource DAG and cast by Type()
type GraphSignal interface { type GraphSignal interface {
Downwards() bool // How to propogate the signal
Source() string Direction() SignalDirection
Source() NodeID
Type() string Type() string
String() string String() string
} }
// BaseSignal is the most basic type of signal, it has no additional data
type BaseSignal struct { type BaseSignal struct {
downwards bool direction SignalDirection
source string source NodeID
_type string _type string
} }
func (signal BaseSignal) Downwards() bool { func (signal BaseSignal) Direction() SignalDirection {
return signal.downwards return signal.direction
} }
func (signal BaseSignal) Source() string { func (signal BaseSignal) Source() NodeID {
return signal.source return signal.source
} }
@ -102,22 +135,17 @@ func (signal BaseSignal) Type() string {
} }
func (signal BaseSignal) String() string { func (signal BaseSignal) String() string {
return fmt.Sprintf("{downwards: %t, source: %s, type: %s}", signal.downwards, signal.source, signal._type) return fmt.Sprintf("{direction: %d, source: %s, type: %s}", signal.direction, signal.source, signal._type)
}
type TimeSignal struct {
BaseSignal
time time.Time
} }
func NewBaseSignal(source GraphNode, _type string, downwards bool) BaseSignal { func NewBaseSignal(source GraphNode, _type string, direction SignalDirection) BaseSignal {
source_id := "" var source_id NodeID = ""
if source != nil { if source != nil {
source_id = source.ID() source_id = source.ID()
} }
signal := BaseSignal{ signal := BaseSignal{
downwards: downwards, direction: direction,
source: source_id, source: source_id,
_type: _type, _type: _type,
} }
@ -125,127 +153,198 @@ func NewBaseSignal(source GraphNode, _type string, downwards bool) BaseSignal {
} }
func NewDownSignal(source GraphNode, _type string) BaseSignal { func NewDownSignal(source GraphNode, _type string) BaseSignal {
return NewBaseSignal(source, _type, true) return NewBaseSignal(source, _type, Down)
} }
func NewSignal(source GraphNode, _type string) BaseSignal { func NewSignal(source GraphNode, _type string) BaseSignal {
return NewBaseSignal(source, _type, false) return NewBaseSignal(source, _type, Up)
}
type NodeState interface {
Name() string
Description() string
DelegationMap() map[string]GraphNode
} }
type BaseNodeState struct { func NewDirectSignal(source GraphNode, _type string) BaseSignal {
name string return NewBaseSignal(source, _type, Direct)
description string
delegation_map map[string]GraphNode
} }
func (state * BaseNodeState) Name() string { func NewAbortSignal(source GraphNode) BaseSignal {
return state.name return NewBaseSignal(source, "abort", Down)
} }
func (state * BaseNodeState) Description() string { func NewCancelSignal(source GraphNode) BaseSignal {
return state.description return NewBaseSignal(source, "cancel", Down)
} }
func (state * BaseNodeState) DelegationMap() map[string]GraphNode { type NodeState interface {
return state.delegation_map Serialize() []byte
OriginalLockHolder(id NodeID) GraphNode
AllowedToTakeLock(id NodeID) bool
RecordLockHolder(id NodeID, lock_holder GraphNode) NodeState
} }
// GraphNode is the interface common to both DAG nodes and Event tree nodes // GraphNode is the interface common to both DAG nodes and Event tree nodes
// They have a NodeState interface which is saved to the database every update
type GraphNode interface { type GraphNode interface {
ID() NodeID
State() NodeState State() NodeState
ID() string StateLock() *sync.RWMutex
UpdateListeners(update GraphSignal)
PropagateUpdate(update GraphSignal) SetState(new_state NodeState)
DeserializeState([]byte) NodeState
// Signal propagation function for listener channels
UpdateListeners(ctx * GraphContext, update GraphSignal)
// Signal propagation function for connected nodes(defined in state)
PropagateUpdate(ctx * GraphContext, update GraphSignal)
// Register and unregister a channel to propogate updates to
RegisterChannel(listener chan GraphSignal) RegisterChannel(listener chan GraphSignal)
UnregisterChannel(listener chan GraphSignal) UnregisterChannel(listener chan GraphSignal)
// Get a handle to the nodes internal signal channel
SignalChannel() chan GraphSignal SignalChannel() chan GraphSignal
} }
func (node * BaseNode) StateLock() *sync.Mutex { // Create a new base node with the given ID
return &node.state_lock func NewNode(ctx * GraphContext, id NodeID, state NodeState) BaseNode {
}
func NewBaseNode(id string) BaseNode {
node := BaseNode{ node := BaseNode{
id: id, id: id,
signal: make(chan GraphSignal, 512), signal: make(chan GraphSignal, 512),
listeners: map[chan GraphSignal]chan GraphSignal{}, listeners: map[chan GraphSignal]chan GraphSignal{},
state: state,
}
err := WriteDBState(ctx, id, state)
if err != nil {
panic(fmt.Sprintf("DB_NEW_WRITE_ERROR: %s", err))
} }
Log.Logf("graph", "NEW_NODE: %s", node.ID())
ctx.Log.Logf("graph", "NEW_NODE: %s - %+v", id, state)
return node return node
} }
// BaseNode is the most basic implementation of the GraphNode interface // BaseNode is the minimum set of fields needed to implement a GraphNode,
// It is used to implement functions common to Events and Resources // and provides a template for more complicated Nodes
type BaseNode struct { type BaseNode struct {
id string id NodeID
state NodeState state NodeState
state_lock sync.RWMutex state_lock sync.RWMutex
signal chan GraphSignal signal chan GraphSignal
listeners_lock sync.Mutex listeners_lock sync.Mutex
listeners map[chan GraphSignal]chan GraphSignal listeners map[chan GraphSignal]chan GraphSignal
} }
func (node * BaseNode) SignalChannel() chan GraphSignal { func (node * BaseNode) ID() NodeID {
return node.signal return node.id
} }
func (node * BaseNode) State() NodeState { func (node * BaseNode) State() NodeState {
return node.state return node.state
} }
func (node * BaseNode) ID() string { func (node * BaseNode) StateLock() * sync.RWMutex {
return node.id return &node.state_lock
} }
const listener_buffer = 100 func (node * BaseNode) DeserializeState([]byte) NodeState {
func GetUpdateChannel(node * BaseNode) chan GraphSignal { return nil
new_listener := make(chan GraphSignal, listener_buffer)
node.RegisterChannel(new_listener)
return new_listener
} }
func (node * BaseNode) RegisterChannel(listener chan GraphSignal) { func WriteDBState(ctx * GraphContext, id NodeID, state NodeState) error {
node.listeners_lock.Lock() ctx.Log.Logf("db", "DB_WRITE: %s - %+v", id, state)
_, exists := node.listeners[listener]
if exists == false { var serialized_state []byte = nil
node.listeners[listener] = listener if state != nil {
serialized_state = state.Serialize()
} else {
serialized_state = []byte{}
} }
node.listeners_lock.Unlock()
err := ctx.DB.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte(id), serialized_state)
return err
})
return err
} }
func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) { func (node * BaseNode) SetState(new_state NodeState) {
node.listeners_lock.Lock() node.state = new_state
_, exists := node.listeners[listener] }
if exists == false {
panic("Attempting to unregister non-registered listener") func UseStates(ctx * GraphContext, nodes []GraphNode, states_fn func(states []NodeState)(interface{}, error)) (interface{}, error) {
} else { for _, node := range(nodes) {
delete(node.listeners, listener) node.StateLock().RLock()
} }
node.listeners_lock.Unlock()
states := make([]NodeState, len(nodes))
for i, node := range(nodes) {
states[i] = node.State()
}
val, err := states_fn(states)
for _, node := range(nodes) {
node.StateLock().RUnlock()
}
return val, err
} }
func (node * BaseNode) PropagateUpdate(update GraphSignal) { func UpdateStates(ctx * GraphContext, nodes []GraphNode, states_fn func(states []NodeState)([]NodeState, interface{}, error)) (interface{}, error) {
for _, node := range(nodes) {
node.StateLock().Lock()
}
states := make([]NodeState, len(nodes))
for i, node := range(nodes) {
states[i] = node.State()
}
new_states, val, err := states_fn(states)
if new_states != nil {
if len(new_states) != len(nodes) {
panic(fmt.Sprintf("NODE_NEW_STATE_LEN_MISMATCH: %d/%d", len(new_states), len(nodes)))
}
for i, new_state := range(new_states) {
if new_state != nil {
old_state_type := reflect.TypeOf(states[i])
new_state_type := reflect.TypeOf(new_state)
if old_state_type != new_state_type {
panic(fmt.Sprintf("NODE_STATE_MISMATCH: old - %+v, new - %+v", old_state_type, new_state_type))
}
err := WriteDBState(ctx, nodes[i].ID(), new_state)
if err != nil {
panic(fmt.Sprintf("DB_WRITE_ERROR: %s", err))
}
nodes[i].SetState(new_state)
}
}
}
for _, node := range(nodes) {
node.StateLock().Unlock()
}
return val, err
} }
func (node * BaseNode) UpdateListeners(update GraphSignal) { func (node * BaseNode) UpdateListeners(ctx * GraphContext, update GraphSignal) {
node.ListenersLock.Lock() node.listeners_lock.Lock()
defer node.ListenersLock.Unlock() defer node.listeners_lock.Unlock()
closed := []chan GraphSignal closed := []chan GraphSignal{}
for _, listener := range node.Listeners() { for _, listener := range node.listeners {
Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener) ctx.Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener)
select { select {
case listener <- signal: case listener <- update:
default: default:
Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener) ctx.Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener)
go func(node GraphNode, listener chan GraphSignal) { go func(node GraphNode, listener chan GraphSignal) {
listener <- NewSignal(node, "listener_closed") listener <- NewSignal(node, "listener_closed")
close(listener) close(listener)
@ -259,34 +358,49 @@ func (node * BaseNode) UpdateListeners(update GraphSignal) {
} }
} }
func SendUpdate(node GraphNode, signal GraphSignal) { func (node * BaseNode) PropagateUpdate(ctx * GraphContext, update GraphSignal) {
node_name := "nil" }
if node != nil {
node_name = node.Name()
}
Log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal)
node.ListenersLock.Lock()
defer node.ListenersLock.Unlock()
closed := []chan GraphSignal
for _, listener := range node.Listeners() { func (node * BaseNode) RegisterChannel(listener chan GraphSignal) {
Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener) node.listeners_lock.Lock()
select { _, exists := node.listeners[listener]
case listener <- signal: if exists == false {
default: node.listeners[listener] = listener
Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener)
go func(node GraphNode, listener chan GraphSignal) {
listener <- NewSignal(node, "listener_closed")
close(listener)
}(node, listener)
closed = append(closed, listener)
}
} }
node.listeners_lock.Unlock()
}
for _, listener := range(closed) { func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) {
node.listeners_lock.Lock()
_, exists := node.listeners[listener]
if exists == false {
panic("Attempting to unregister non-registered listener")
} else {
delete(node.listeners, listener) delete(node.listeners, listener)
} }
node.listeners_lock.Unlock()
}
func (node * BaseNode) SignalChannel() chan GraphSignal {
return node.signal
}
// Create a new GraphSinal channel with a buffer of size buffer and register it to a node
func GetUpdateChannel(node * BaseNode, buffer int) chan GraphSignal {
new_listener := make(chan GraphSignal, buffer)
node.RegisterChannel(new_listener)
return new_listener
}
// Propogate a signal starting at a node
func SendUpdate(ctx * GraphContext, node GraphNode, signal GraphSignal) {
if node == nil {
panic("Cannot start an update from no node")
}
ctx.Log.Logf("update", "UPDATE %s <- %s: %+v", node.ID(), signal.Source(), signal)
node.PropagateUpdate(signal) node.UpdateListeners(ctx, signal)
node.PropagateUpdate(ctx, signal)
} }

@ -2,23 +2,23 @@ package graphvent
import ( import (
"testing" "testing"
"time"
"fmt" "fmt"
"os" "time"
"runtime/pprof" "runtime/pprof"
"os"
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
) )
type GraphTester testing.T type GraphTester testing.T
const listner_timeout = 50 * time.Millisecond const listner_timeout = 50 * time.Millisecond
func (t * GraphTester) WaitForValue(listener chan GraphSignal, signal_type string, source GraphNode, timeout time.Duration, str string) GraphSignal { func (t * GraphTester) WaitForValue(ctx * GraphContext, listener chan GraphSignal, signal_type string, source GraphNode, timeout time.Duration, str string) GraphSignal {
timeout_channel := time.After(timeout) timeout_channel := time.After(timeout)
for true { for true {
select { select {
case signal := <- listener: case signal := <- listener:
if signal.Type() == signal_type { if signal.Type() == signal_type {
Log.Logf("test", "SIGNAL_TYPE_FOUND: %s - %s %+v\n", signal.Type(), signal.Source(), listener) ctx.Log.Logf("test", "SIGNAL_TYPE_FOUND: %s - %s %+v\n", signal.Type(), signal.Source(), listener)
if signal.Source() == source.ID() { if signal.Source() == source.ID() {
return signal return signal
} }
@ -32,18 +32,6 @@ func (t * GraphTester) WaitForValue(listener chan GraphSignal, signal_type strin
return nil return nil
} }
func (t * GraphTester) CheckForValue(listener chan GraphSignal, str string) GraphSignal {
timeout := time.After(listner_timeout)
select {
case signal := <- listener:
return signal
case <-timeout:
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatal(str)
return nil
}
}
func (t * GraphTester) CheckForNone(listener chan GraphSignal, str string) { func (t * GraphTester) CheckForNone(listener chan GraphSignal, str string) {
timeout := time.After(listner_timeout) timeout := time.After(listner_timeout)
select { select {
@ -54,462 +42,26 @@ func (t * GraphTester) CheckForNone(listener chan GraphSignal, str string) {
} }
} }
func TestNewEventWithResource(t *testing.T) { func logTestContext(t * testing.T, components []string) * GraphContext {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger1")) db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatal(err)
}
name := "Test Resource"
description := "A resource for testing"
children := []Resource{}
test_resource, _ := NewResource(name, description, children)
root_event, err := NewEvent(db, "root_event", "", []Resource{test_resource})
if err != nil {
t.Fatal(err)
}
res := FindResource(root_event, test_resource.ID())
if res == nil {
t.Fatal("Failed to find Resource in EventManager after adding")
}
if res.Name() != name || res.Description() != description {
t.Fatal("Name/description of returned resource did not match added resource")
}
}
func TestDoubleResourceAdd(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger2"))
if err != nil {
t.Fatal(err)
}
test_resource, _ := NewResource("", "", []Resource{})
_, err = NewEvent(db, "", "", []Resource{test_resource, test_resource})
if err == nil {
t.Fatal("NewEvent didn't return an error")
}
}
func TestTieredResource(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger3"))
if err != nil {
t.Fatal(err)
}
r1, _ := NewResource("r1", "", []Resource{})
r2, err := NewResource("r2", "", []Resource{r1})
if err != nil {
t.Fatal(err)
}
_, err = NewEvent(db, "", "", []Resource{r2})
if err != nil {
t.Fatal("Failed to create event with tiered resources")
}
}
func TestResourceUpdate(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger4"))
if err != nil {
t.Fatal(err)
}
r1, err := NewResource("r1", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r2, err := NewResource("r2", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r3, err := NewResource("r3", "", []Resource{r1, r2})
if err != nil {
t.Fatal(err)
}
r4, err := NewResource("r4", "", []Resource{r3})
if err != nil {
t.Fatal(err)
}
_, err = NewEvent(db, "", "", []Resource{r3, r4})
if err != nil {
t.Fatal("Failed to add initial tiered resources for test")
}
r1_l := r1.UpdateChannel()
r2_l := r2.UpdateChannel()
r3_l := r3.UpdateChannel()
r4_l := r4.UpdateChannel()
// Calling Update() on the parent with no other parents should only notify node listeners
SendUpdate(r3, NewSignal(nil, "test"))
(*GraphTester)(t).CheckForNone(r1_l, "Update on r1 after updating r3")
(*GraphTester)(t).CheckForNone(r2_l, "Update on r2 after updating r3")
(*GraphTester)(t).CheckForValue(r3_l, "No update on r3 after updating r3")
(*GraphTester)(t).CheckForValue(r4_l, "No update on r4 after updating r3")
// Calling Update() on a child should notify listeners of the parent and child, but not siblings
SendUpdate(r2, NewSignal(nil, "test"))
(*GraphTester)(t).CheckForNone(r1_l, "Update on r1 after updating r2")
(*GraphTester)(t).CheckForValue(r2_l, "No update on r2 after updating r2")
(*GraphTester)(t).CheckForValue(r3_l, "No update on r3 after updating r2")
(*GraphTester)(t).CheckForValue(r4_l, "No update on r4 after updating r2")
// Calling Update() on a child should notify listeners of the parent and child, but not siblings
SendUpdate(r1, NewSignal(nil, "test"))
(*GraphTester)(t).CheckForValue(r1_l, "No update on r1 after updating r1")
(*GraphTester)(t).CheckForNone(r2_l, "Update on r2 after updating r1")
(*GraphTester)(t).CheckForValue(r3_l, "No update on r3 after updating r1")
(*GraphTester)(t).CheckForValue(r4_l, "No update on r4 after updating r1")
}
func TestAddEvent(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger5"))
if err != nil {
t.Fatal(err)
}
r1, _ := NewResource("r1", "", []Resource{})
r2, _ := NewResource("r2", "", []Resource{r1})
root_event, _ := NewEvent(db, "", "", []Resource{r2})
name := "Test Event"
description := "A test event"
resources := []Resource{r2}
new_event, _ := NewEvent(db, name, description, resources)
err = LinkEvent(root_event, new_event, nil)
if err != nil {
t.Fatalf("Failed to add new_event to root_event: %s", err)
}
res := FindChild(root_event, new_event.ID())
if res == nil {
t.Fatalf("Failed to find new_event in event_manager: %s", err)
}
if res.Name() != name || res.Description() != description {
t.Fatal("Event found in event_manager didn't match added")
}
res_required := res.Resources()
if len(res_required) < 1 {
t.Fatal("Event found in event_manager didn't match added")
} else if res_required[0].ID() != r2.ID() {
t.Fatal("Event found in event_manager didn't match added")
}
}
func TestLockResource(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger6"))
if err != nil {
t.Fatal(err)
}
r1, err := NewResource("r1", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r2, err := NewResource("r2", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r3, err := NewResource("r3", "", []Resource{r1, r2})
if err != nil {
t.Fatal(err)
}
r4, err := NewResource("r4", "", []Resource{r1, r2})
if err != nil {
t.Fatal(err)
}
root_event, err := NewEvent(db, "", "", []Resource{})
if err != nil {
t.Fatal(err)
}
test_event, err := NewEvent(db, "", "", []Resource{})
if err != nil {
t.Fatal(err)
}
r1_l := r1.UpdateChannel()
rel := root_event.UpdateChannel()
err = LockResource(r3, root_event)
if err != nil {
t.Fatal("Failed to lock r3")
}
SendUpdate(r3, NewDownSignal(r3, "locked"))
(*GraphTester)(t).WaitForValue(r1_l, "locked", r3, time.Second, "Wasn't notified of r1 lock on r1 after r3 lock")
(*GraphTester)(t).WaitForValue(rel, "locked", r3, time.Second, "Wasn't notified of r1 lock on rel after r3 lock")
err = LockResource(r3, root_event)
if err == nil {
t.Fatal("Locked r3 after locking r3")
}
err = LockResource(r4, root_event)
if err == nil {
t.Fatal("Locked r4 after locking r3")
}
err = LockResource(r1, root_event)
if err == nil {
t.Fatal("Locked r1 after locking r3")
}
err = UnlockResource(r3, test_event)
if err == nil {
t.Fatal("Unlocked r3 with event that didn't lock it")
}
err = UnlockResource(r3, root_event)
if err != nil {
t.Fatal("Failed to unlock r3")
}
SendUpdate(r3, NewDownSignal(r3, "unlocked"))
(*GraphTester)(t).WaitForValue(r1_l, "unlocked", r3, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r3 unlock")
err = LockResource(r4, root_event)
if err != nil {
t.Fatal("Failed to lock r4 after unlocking r3")
}
SendUpdate(r4, NewDownSignal(r4, "locked"))
(*GraphTester)(t).WaitForValue(r1_l, "locked", r4, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock")
(*GraphTester)(t).WaitForValue(rel, "locked", r4, time.Second * 2, "Wasn't notified of r1 lock on r1 after r4 lock")
err = UnlockResource(r4, root_event)
if err != nil {
t.Fatal("Failed to unlock r4")
}
SendUpdate(r4, NewDownSignal(r4, "unlocked"))
(*GraphTester)(t).WaitForValue(r1_l, "unlocked", r4, time.Second * 2, "Wasn't notified of r1 unlock on r1 after r4 lock")
}
func TestAddToEventQueue(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger7"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
queue, _ := NewEventQueue("q", "", []Resource{})
event_1, _ := NewEvent(db, "1", "", []Resource{})
event_2, _ := NewEvent(db, "2", "", []Resource{})
err = LinkEvent(queue, event_1, nil)
if err == nil {
t.Fatal("suceeded in added nil info to queue")
}
err = LinkEvent(queue, event_1, &EventQueueInfo{priority: 0})
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
err = LinkEvent(queue, event_2, &EventQueueInfo{priority: 1}) return NewGraphContext(db, NewConsoleLogger(components))
if err != nil {
t.Fatal("failed to add valid event + info to queue")
}
} }
func TestStartBaseEvent(t * testing.T) { func testContext(t * testing.T) * GraphContext {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger8")) db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatal(err)
}
event_1, _ := NewEvent(db, "TestStartBaseEvent event_1", "", []Resource{})
r := event_1.DoneResource()
e_l := event_1.UpdateChannel()
r_l := r.UpdateChannel()
(*GraphTester)(t).CheckForNone(e_l, "Update on event_1 before starting")
(*GraphTester)(t).CheckForNone(r_l, "Update on r_1 before starting")
if r.Owner().ID() != event_1.ID() {
t.Fatal("r is not owned by event_1")
}
err = LockResources(event_1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = RunEvent(event_1) return NewGraphContext(db, NewConsoleLogger([]string{}))
if err != nil {
t.Fatal(err)
}
// Check that the update channels for the event and resource have updates
(*GraphTester)(t).WaitForValue(e_l, "event_start", event_1, 1*time.Second, "No event_start on e_l")
(*GraphTester)(t).WaitForValue(e_l, "event_done", event_1, 1*time.Second, "No event_start on e_l")
(*GraphTester)(t).WaitForValue(r_l, "unlocked", event_1, 1*time.Second, "No unlocked on r_l")
if r.Owner() != nil {
t.Fatal("r still owned after event completed")
}
} }
func TestAbortEventQueue(t * testing.T) { func fatalErr(t * testing.T, err error) {
r1, _ := NewResource("r1", "", []Resource{})
root_event, _ := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
LockResource(r1, root_event)
// Now that the event is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
time.Sleep(100 * time.Millisecond)
abort_signal := NewDownSignal(nil, "abort")
SendUpdate(root_event, abort_signal)
}()
err := LockResources(root_event)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = RunEvent(root_event)
if err == nil {
t.Fatal("root_event completed without error")
}
if r.Owner() == nil {
t.Fatal("root event was finished after starting")
}
} }
func TestDelegateLock(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger"))
if err != nil {
t.Fatal(err)
}
test_resource, _ := NewResource("test_resource", "", []Resource{})
root_event, _ := NewEventQueue("root_event", "", []Resource{test_resource})
test_event, _ := NewEvent(db, "test_event", "", []Resource{test_resource})
err = LinkEvent(root_event, test_event, NewEventQueueInfo(1))
if err != nil {
t.Fatal(err)
}
err = LockResources(root_event)
if err != nil {
t.Fatal(err)
}
test_listener := test_event.UpdateChannel()
go func() {
(*GraphTester)(t).WaitForValue(test_listener, "event_done", test_event, 250 * time.Millisecond, "No event_done for test_event")
if test_resource.Owner().ID() != root_event.ID() {
t.Fatal("Lock failed to pass back to root_event")
}
abort_signal := NewDownSignal(nil, "cancel")
SendUpdate(root_event, abort_signal)
}()
err = RunEvent(root_event)
if err != nil {
t.Fatal(err)
}
}
func TestStartWithoutLocking(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger9"))
if err != nil {
t.Fatal(err)
}
test_resource, _ := NewResource("test_resource", "", []Resource{})
root_event, _ := NewEvent(db, "root_event", "", []Resource{test_resource})
err = RunEvent(root_event)
if err == nil {
t.Fatal("Event ran without error without locking resources")
}
}
func TestStartEventQueue(t * testing.T) {
db, err := badger.Open(badger.DefaultOptions("/tmp/badger10"))
if err != nil {
t.Fatal(err)
}
root_event, _ := NewEventQueue("root_event", "", []Resource{})
r := root_event.DoneResource()
rel := root_event.UpdateChannel();
res_1, _ := NewResource("test_resource_1", "", []Resource{})
res_2, _ := NewResource("test_resource_2", "", []Resource{})
e1, _ := NewEvent(db, "e1", "", []Resource{res_1, res_2})
e1_l := e1.UpdateChannel()
e1_r := e1.DoneResource()
e1_info := NewEventQueueInfo(1)
err = LinkEvent(root_event, e1, e1_info)
if err != nil {
t.Fatal("Failed to add e1 to root_event")
}
(*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1")
e2, _ := NewEvent(db, "e2", "", []Resource{res_1})
e2_l := e2.UpdateChannel()
e2_r := e2.DoneResource()
e2_info := NewEventQueueInfo(2)
err = LinkEvent(root_event, e2, e2_info)
if err != nil {
t.Fatal("Failed to add e2 to root_event")
}
(*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2")
e3, _ := NewEvent(db, "e3", "", []Resource{res_2})
e3_l := e3.UpdateChannel()
e3_r := e3.DoneResource()
e3_info := NewEventQueueInfo(3)
err = LinkEvent(root_event, e3, e3_info)
if err != nil {
t.Fatal("Failed to add e3 to root_event")
}
(*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e3")
// Abort the event after 5 seconds just in case
go func() {
time.Sleep(5 * time.Second)
if r.Owner() != nil {
abort_signal := NewDownSignal(nil, "abort")
SendUpdate(root_event, abort_signal)
}
}()
// Now that a root_event is constructed with a queue and 3 basic events
// start the queue and check that all the events are executed
go func() {
(*GraphTester)(t).WaitForValue(e1_l, "event_done", e1, time.Second, "No event_done for e3")
(*GraphTester)(t).WaitForValue(e2_l, "event_done", e2, time.Second, "No event_done for e3")
(*GraphTester)(t).WaitForValue(e3_l, "event_done", e3, time.Second, "No event_done for e3")
signal := NewDownSignal(nil, "cancel")
SendUpdate(root_event, signal)
}()
err = LockResources(root_event)
if err != nil {
t.Fatal(err)
}
err = RunEvent(root_event)
if err != nil {
t.Fatal(err)
}
if r.Owner() != nil {
t.Fatal("root event was not finished after starting")
}
if e1_r.Owner() != nil {
t.Fatal(fmt.Sprintf("e1 was not completed: %s", e1_r.Owner()))
}
if e2_r.Owner() != nil {
t.Fatal(fmt.Sprintf("e2 was not completed"))
}
if e3_r.Owner() != nil {
t.Fatal("e3 was not completed")
}
}

@ -2,101 +2,168 @@ package graphvent
import ( import (
"fmt" "fmt"
"sync"
"errors"
) )
// Resources propagate update up to multiple parents, and not downwards // Link a resource with a child
// (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team) func LinkResource(ctx * GraphContext, resource Resource, child Resource) error {
func (resource * BaseResource) PropagateUpdate(signal GraphSignal) { if resource == nil || child == nil {
return fmt.Errorf("Will not connect nil to DAG")
}
_, err := UpdateStates(ctx, []GraphNode{resource, child}, func(states []NodeState) ([]NodeState, interface{}, error) {
resource_state := states[0].(ResourceState)
child_state := states[1].(ResourceState)
if signal.Downwards() == false { if checkIfChild(ctx, resource_state, resource.ID(), child_state, child.ID()) == true {
// Child->Parent, resource updates parent resources return nil, nil, fmt.Errorf("RESOURCE_LINK_ERR: %s is a parent of %s so cannot link as child", child.ID(), resource.ID())
resource.connection_lock.Lock()
defer resource.connection_lock.Unlock()
for _, parent := range resource.Parents() {
SendUpdate(parent, signal)
} }
} else {
// Parent->Child, resource updates lock holder resource_state.children = append(resource_state.children, child)
resource.lock_holder_lock.Lock() child_state.parents = append(child_state.parents, resource)
defer resource.lock_holder_lock.Unlock() return []NodeState{resource_state, child_state}, nil, nil
if resource.lock_holder != nil { })
SendUpdate(resource.lock_holder, signal) return err
}
// Link multiple children to a resource
func LinkResources(ctx * GraphContext, resource Resource, children []Resource) error {
if resource == nil || children == nil {
return fmt.Errorf("Invalid input")
} }
resource.connection_lock.Lock() found := map[NodeID]bool{}
defer resource.connection_lock.Unlock() child_nodes := make([]GraphNode, len(children))
for _, child := range(resource.children) { for i, child := range(children) {
SendUpdate(child, signal) if child == nil {
return fmt.Errorf("Will not connect nil to DAG")
} }
_, exists := found[child.ID()]
if exists == true {
return fmt.Errorf("Will not connect the same child twice")
}
found[child.ID()] = true
child_nodes[i] = child
} }
}
// Resource is the interface that DAG nodes are made from _, err := UpdateStates(ctx, append([]GraphNode{resource}, child_nodes...), func(states []NodeState) ([]NodeState, interface{}, error) {
// A resource needs to be able to represent Logical entities and connections to physical entities. resource_state := states[0].(ResourceState)
// A resource lock could be aborted at any time if this connection is broken, if that happens the event locking it must be aborted
// The device connection should be maintained as much as possible(requiring some reconnection behaviour in the background) new_states := make([]ResourceState, len(states))
type Resource interface { for i, state := range(states) {
GraphNode new_states[i] = state.(ResourceState)
Owner() GraphNode }
Children() []Resource
Parents() []Resource for i, state := range(states[1:]) {
child_state := state.(ResourceState)
AddParent(parent Resource) if checkIfChild(ctx, resource_state, resource.ID(), child_state, children[i].ID()) == true {
AddChild(child Resource) return nil, nil, fmt.Errorf("RESOURCES_LINK_ERR: %s is a parent of %s so cannot link as child", children[i].ID() , resource.ID())
LockConnections() }
UnlockConnections()
SetOwner(owner GraphNode) new_states[0].children = append(new_states[0].children, children[i])
LockState() new_states[i+1].parents = append(new_states[i+1].parents, resource)
UnlockState() }
ret_states := make([]NodeState, len(states))
for i, state := range(new_states) {
ret_states[i] = state
}
return ret_states, nil, nil
})
String() string return err
}
lock(node GraphNode) error type ResourceState struct {
unlock(node GraphNode) error name string
owner GraphNode
children []Resource
parents []Resource
} }
func (resource * BaseResource) String() string { func (state ResourceState) Serialize() []byte {
return resource.Name() return []byte(state.name)
} }
// Recurse up cur's parents to ensure r is not present // Locks cannot be passed between resources, so the answer to
func checkIfParent(r Resource, cur Resource) bool { // "who used to own this lock held by a resource" is always "nobody"
if r == nil || cur == nil { func (state ResourceState) OriginalLockHolder(id NodeID) GraphNode {
panic("Cannot recurse DAG with nil") return nil
} }
if r.ID() == cur.ID() { // Nothing can take a lock from a resource
return true func (state ResourceState) AllowedToTakeLock(id NodeID) bool {
} return false
}
cur.LockConnections() func (state ResourceState) RecordLockHolder(id NodeID, lock_holder GraphNode) NodeState {
defer cur.UnlockConnections() if lock_holder != nil {
for _, p := range(cur.Parents()) { panic("Attempted to delegate a lock to a resource")
if checkIfParent(r, p) == true {
return true
} }
return state
}
func NewResourceState(name string) ResourceState {
return ResourceState{
name: name,
owner: nil,
children: []Resource{},
parents: []Resource{},
} }
}
return false // Resource represents a Node which can be locked by another node,
// and needs to own all it's childrens locks before being locked.
// Resource connections form a directed acyclic graph
// Resources do not allow any other nodes to take locks from them
type Resource interface {
GraphNode
// Called when locking the node to allow for custom lock behaviour
Lock(node GraphNode, state NodeState) (NodeState, error)
// Called when unlocking the node to allow for custom lock behaviour
Unlock(node GraphNode, state NodeState) (NodeState, error)
} }
// Recurse doen cur's children to ensure r is not present // Resources propagate update up to multiple parents, and not downwards
func checkIfChild(r Resource, cur Resource) bool { // (subscriber to team won't get update to alliance, but subscriber to alliance will get update to team)
if r == nil || cur == nil { func (resource * BaseResource) PropagateUpdate(ctx * GraphContext, signal GraphSignal) {
panic("Cannot recurse DAG with nil") UseStates(ctx, []GraphNode{resource}, func(states []NodeState) (interface{}, error){
resource_state := states[0].(ResourceState)
if signal.Direction() == Up {
// Child->Parent, resource updates parent resources
for _, parent := range resource_state.parents {
SendUpdate(ctx, parent, signal)
}
} else if signal.Direction() == Down {
// Parent->Child, resource updates lock holder
if resource_state.owner != nil {
SendUpdate(ctx, resource_state.owner, signal)
}
for _, child := range(resource_state.children) {
SendUpdate(ctx, child, signal)
}
} else if signal.Direction() == Direct {
} else {
panic(fmt.Sprintf("Invalid signal direction: %d", signal.Direction()))
} }
return nil, nil
})
}
if r.ID() == cur.ID() { func checkIfChild(ctx * GraphContext, r ResourceState, r_id NodeID, cur ResourceState, cur_id NodeID) bool {
if r_id == cur_id {
return true return true
} }
cur.LockConnections() for _, c := range(cur.children) {
defer cur.UnlockConnections() val, _ := UseStates(ctx, []GraphNode{c}, func(states []NodeState) (interface{}, error) {
for _, c := range(cur.Children()) { child_state := states[0].(ResourceState)
if checkIfChild(r, c) == true { return checkIfChild(ctx, cur, cur_id, child_state, c.ID()), nil
})
is_child := val.(bool)
if is_child {
return true return true
} }
} }
@ -104,21 +171,31 @@ func checkIfChild(r Resource, cur Resource) bool {
return false return false
} }
func UnlockResource(resource Resource, node GraphNode) error { func UnlockResource(ctx * GraphContext, resource Resource, node GraphNode, node_state NodeState) (NodeState, error) {
var err error = nil if node == nil || resource == nil{
resource.LockState() panic("Cannot unlock without a specified node and resource")
defer resource.UnlockState() }
if resource.Owner() == nil { _, err := UpdateStates(ctx, []GraphNode{resource}, func(states []NodeState) ([]NodeState, interface{}, error) {
return errors.New("Resource already unlocked") if resource.ID() == node.ID() {
if node_state != nil {
panic("node_state must be nil if unlocking resource from itself")
}
node_state = states[0]
} }
resource_state := states[0].(ResourceState)
if resource.Owner().ID() != node.ID() { if resource_state.owner == nil {
return errors.New("Resource not locked by parent, unlock failed") return nil, nil, fmt.Errorf("Resource already unlocked")
}
if resource_state.owner.ID() != node.ID() {
return nil, nil, fmt.Errorf("Resource %s not locked by %s", resource.ID(), node.ID())
} }
var lock_err error = nil var lock_err error = nil
for _, child := range resource.Children() { for _, child := range(resource_state.children) {
err := UnlockResource(child, node) var err error = nil
node_state, err = UnlockResource(ctx, child, node, node_state)
if err != nil { if err != nil {
lock_err = err lock_err = err
break break
@ -126,49 +203,76 @@ func UnlockResource(resource Resource, node GraphNode) error {
} }
if lock_err != nil { if lock_err != nil {
return fmt.Errorf("Resource failed to unlock: %s", lock_err) return nil, nil, fmt.Errorf("Resource %s failed to unlock: %e", resource.ID(), lock_err)
} }
resource.SetOwner(node.Delegator(resource.ID())) resource_state.owner = node_state.OriginalLockHolder(resource.ID())
unlock_state, err := resource.Unlock(node, resource_state)
resource_state = unlock_state.(ResourceState)
if err != nil {
return nil, nil, fmt.Errorf("Resource %s failed custom Unlock: %e", resource.ID(), err)
}
if resource_state.owner == nil {
ctx.Log.Logf("resource", "RESOURCE_UNLOCK: %s unlocked %s", node.ID(), resource.ID())
} else {
ctx.Log.Logf("resource", "RESOURCE_UNLOCK: %s passed lock of %s back to %s", node.ID(), resource.ID(), resource_state.owner.ID())
}
return []NodeState{resource_state}, nil, nil
})
err = resource.unlock(node)
if err != nil { if err != nil {
return errors.New("Failed to unlock resource") return nil, err
} }
return nil return node_state, nil
} }
func isAllowedToTakeLock(node GraphNode, current_owner GraphNode) bool { // TODO: State
for _, allowed := range(current_owner.Allowed()) { func LockResource(ctx * GraphContext, resource Resource, node GraphNode, node_state NodeState) (NodeState, error) {
if allowed.ID() == node.ID() { if node == nil || resource == nil {
return true panic("Cannot lock without a specified node and resource")
} }
_, err := UpdateStates(ctx, []GraphNode{resource}, func(states []NodeState) ([]NodeState, interface{}, error) {
if resource.ID() == node.ID() {
if node_state != nil {
panic("node_state must be nil if locking resource from itself")
}
node_state = states[0]
}
resource_state := states[0].(ResourceState)
if resource_state.owner != nil {
var lock_pass_allowed bool = false
if resource_state.owner.ID() == resource.ID() {
lock_pass_allowed = resource_state.AllowedToTakeLock(node.ID())
} else {
tmp, _ := UseStates(ctx, []GraphNode{resource_state.owner}, func(states []NodeState)(interface{}, error){
return states[0].AllowedToTakeLock(node.ID()), nil
})
lock_pass_allowed = tmp.(bool)
} }
return false
}
func LockResource(resource Resource, node GraphNode) error {
resource.LockState()
defer resource.UnlockState()
if resource.Owner() != nil { if lock_pass_allowed == false {
// Check if node is allowed to take a lock from resource.Owner() return nil, nil, fmt.Errorf("%s is not allowed to take a lock from %s", node.ID(), resource_state.owner.ID())
if isAllowedToTakeLock(node, resource.Owner()) == false {
return fmt.Errorf("%s is not allowed to take a lock from %s, allowed: %+v", node.Name(), resource.Owner().Name(), resource.Owner().Allowed())
} }
} }
err := resource.lock(node) lock_state, err := resource.Lock(node, resource_state)
if err != nil { if err != nil {
return fmt.Errorf("Failed to lock resource: %s", err) return nil, nil, fmt.Errorf("Failed to lock resource: %e", err)
} }
resource_state = lock_state.(ResourceState)
var lock_err error = nil var lock_err error = nil
locked_resources := []Resource{} locked_resources := []Resource{}
for _, child := range resource.Children() { for _, child := range(resource_state.children) {
err := LockResource(child, node) node_state, err = LockResource(ctx, child, node, node_state)
if err != nil{ if err != nil {
lock_err = err lock_err = err
break break
} }
@ -176,90 +280,49 @@ func LockResource(resource Resource, node GraphNode) error {
} }
if lock_err != nil { if lock_err != nil {
return fmt.Errorf("Resource failed to lock: %s", lock_err) for _, locked_resource := range(locked_resources) {
node_state, err = UnlockResource(ctx, locked_resource, node, node_state)
if err != nil {
panic(err)
}
}
return nil, nil, fmt.Errorf("Resource failed to lock: %e", lock_err)
} }
Log.Logf("resource", "Locked %s", resource.Name()) old_owner := resource_state.owner
node.TakeLock(resource) resource_state.owner = node
resource.SetOwner(node) node_state = node_state.RecordLockHolder(node.ID(), old_owner)
return nil
}
// BaseResource is the most basic resource that can exist in the DAG
// It holds a single state variable, which contains a pointer to the event that is locking it
type BaseResource struct {
BaseNode
parents []Resource
children []Resource
connection_lock sync.Mutex
lock_holder GraphNode
lock_holder_lock sync.Mutex
state_lock sync.Mutex
}
func (resource * BaseResource) SetOwner(owner GraphNode) { if old_owner == nil {
resource.lock_holder_lock.Lock() ctx.Log.Logf("resource", "RESOURCE_LOCK: %s locked %s", node.ID(), resource.ID())
resource.lock_holder = owner } else {
resource.lock_holder_lock.Unlock() ctx.Log.Logf("resource", "RESOURCE_LOCK: %s took lock of %s from %s", node.ID(), resource.ID(), old_owner.ID())
} }
func (resource * BaseResource) LockState() { return []NodeState{resource_state}, nil, nil
resource.state_lock.Lock() })
} if err != nil {
return nil, err
}
func (resource * BaseResource) UnlockState() { return node_state, nil
resource.state_lock.Unlock()
} }
func (resource * BaseResource) Owner() GraphNode { // BaseResources represent simple resources in the DAG that can be used to create a hierarchy of locks that store names
return resource.lock_holder type BaseResource struct {
BaseNode
} }
//BaseResources don't check anything special when locking/unlocking //BaseResources don't check anything special when locking/unlocking
func (resource * BaseResource) lock(node GraphNode) error { func (resource * BaseResource) Lock(node GraphNode, state NodeState) (NodeState, error) {
return nil return state, nil
}
func (resource * BaseResource) unlock(node GraphNode) error {
return nil
} }
func (resource * BaseResource) Children() []Resource { func (resource * BaseResource) Unlock(node GraphNode, state NodeState) (NodeState, error) {
return resource.children return state, nil
} }
func (resource * BaseResource) Parents() []Resource { /*func FindResource(root Event, id string) Resource {
return resource.parents
}
func (resource * BaseResource) LockConnections() {
resource.connection_lock.Lock()
}
func (resource * BaseResource) UnlockConnections() {
resource.connection_lock.Unlock()
}
func (resource * BaseResource) AddParent(parent Resource) {
resource.parents = append(resource.parents, parent)
}
func (resource * BaseResource) AddChild(child Resource) {
resource.children = append(resource.children, child)
}
func NewBaseResource(name string, description string) BaseResource {
resource := BaseResource{
BaseNode: NewBaseNode(name, description, randid()),
parents: []Resource{},
children: []Resource{},
}
return resource
}
func FindResource(root Event, id string) Resource {
if root == nil || id == ""{ if root == nil || id == ""{
panic("invalid input") panic("invalid input")
} }
@ -276,48 +339,17 @@ func FindResource(root Event, id string) Resource {
} }
} }
return nil return nil
} }*/
func LinkResource(resource Resource, child Resource) error {
if child == nil || resource == nil {
return fmt.Errorf("Will not connect nil to resource DAG")
} else if child.ID() == resource.ID() {
return fmt.Errorf("Will not connect resource to itself")
}
if checkIfChild(resource, child) {
return fmt.Errorf("%s is a child of %s, cannot add as parent", resource.Name(), child.Name())
}
for _, p := range(resource.Parents()) { func NewResource(ctx * GraphContext, name string, children []Resource) (* BaseResource, error) {
if checkIfParent(child, p) { resource := &BaseResource{
return fmt.Errorf("Will not add %s as a parent of itself", child.Name()) BaseNode: NewNode(ctx, RandID(), NewResourceState(name)),
} }
}
child.AddParent(resource)
resource.AddChild(child)
return nil
}
func LinkResources(resource Resource, children []Resource) error {
for _, c := range(children) {
err := LinkResource(resource, c)
if err != nil {
return err
}
}
return nil
}
func NewResource(name string, description string, children []Resource) (* BaseResource, error) {
resource := NewBaseResource(name, description)
resource_ptr := &resource
err := LinkResources(resource_ptr, children) err := LinkResources(ctx, resource, children)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return resource_ptr, nil return resource, nil
} }

@ -0,0 +1,226 @@
package graphvent
import (
"testing"
"fmt"
)
func TestNewResource(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
_, err = NewResource(ctx, "Test resource 2", []Resource{r1})
fatalErr(t, err)
}
func TestRepeatedChildResource(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
_, err = NewResource(ctx, "Test resource 2", []Resource{r1, r1})
if err == nil {
t.Fatal("Added the same resource as a child twice to the same resource")
}
}
func TestResourceSelfLock(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
_, err = LockResource(ctx, r1, r1, nil)
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) {
owner_id := states[0].(ResourceState).owner.ID()
if owner_id != r1.ID() {
return nil, fmt.Errorf("r1 is owned by %s instead of self", owner_id)
}
return nil, nil
})
fatalErr(t, err)
_, err = UnlockResource(ctx, r1, r1, nil)
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) {
owner := states[0].(ResourceState).owner
if owner != nil {
return nil, fmt.Errorf("r1 is not unowned after unlock: %s", owner.ID())
}
return nil, nil
})
fatalErr(t, err)
}
func TestResourceSelfLockTiered(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
r2, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
r3, err := NewResource(ctx, "Test resource 3", []Resource{r1, r2})
fatalErr(t, err)
_, err = LockResource(ctx, r3, r3, nil)
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1, r2}, func(states []NodeState) (interface{}, error) {
owner_1_id := states[0].(ResourceState).owner.ID()
if owner_1_id != r3.ID() {
return nil, fmt.Errorf("r1 is owned by %s instead of r3", owner_1_id)
}
owner_2_id := states[1].(ResourceState).owner.ID()
if owner_2_id != r3.ID() {
return nil, fmt.Errorf("r2 is owned by %s instead of r3", owner_2_id)
}
return nil, nil
})
fatalErr(t, err)
_, err = UnlockResource(ctx, r3, r3, nil)
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1, r2, r3}, func(states []NodeState) (interface{}, error) {
owner_1 := states[0].(ResourceState).owner
if owner_1 != nil {
return nil, fmt.Errorf("r1 is not unowned after unlocking: %s", owner_1.ID())
}
owner_2 := states[1].(ResourceState).owner
if owner_2 != nil {
return nil, fmt.Errorf("r2 is not unowned after unlocking: %s", owner_2.ID())
}
owner_3 := states[2].(ResourceState).owner
if owner_3 != nil {
return nil, fmt.Errorf("r3 is not unowned after unlocking: %s", owner_3.ID())
}
return nil, nil
})
fatalErr(t, err)
}
func TestResourceLockOther(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
r2, err := NewResource(ctx, "Test resource 2", []Resource{})
fatalErr(t, err)
_, err = UpdateStates(ctx, []GraphNode{r2}, func(states []NodeState) ([]NodeState, interface{}, error) {
new_state, err := LockResource(ctx, r1, r2, states[0])
fatalErr(t, err)
return []NodeState{new_state}, nil, nil
})
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) {
owner_id := states[0].(ResourceState).owner.ID()
if owner_id != r2.ID() {
return nil, fmt.Errorf("r1 is owned by %s instead of r2", owner_id)
}
return nil, nil
})
fatalErr(t, err)
_, err = UpdateStates(ctx, []GraphNode{r2}, func(states []NodeState) ([]NodeState, interface{}, error) {
new_state, err := UnlockResource(ctx, r1, r2, states[0])
fatalErr(t, err)
return []NodeState{new_state}, nil, nil
})
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) {
owner := states[0].(ResourceState).owner
if owner != nil {
return nil, fmt.Errorf("r1 is owned by %s instead of r2", owner.ID())
}
return nil, nil
})
fatalErr(t, err)
}
func TestResourceLockSimpleConflict(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
r2, err := NewResource(ctx, "Test resource 2", []Resource{})
fatalErr(t, err)
_, err = LockResource(ctx, r1, r1, nil)
fatalErr(t, err)
_, err = UpdateStates(ctx, []GraphNode{r2}, func(states []NodeState) ([]NodeState, interface{}, error) {
new_state, err := LockResource(ctx, r1, r2, states[0])
if err == nil {
t.Fatal("r2 took r1's lock from itself")
}
return []NodeState{new_state}, nil, nil
})
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) {
owner_id := states[0].(ResourceState).owner.ID()
if owner_id != r1.ID() {
return nil, fmt.Errorf("r1 is owned by %s instead of r1", owner_id)
}
return nil, nil
})
fatalErr(t, err)
_, err = UnlockResource(ctx, r1, r1, nil)
fatalErr(t, err)
_, err = UseStates(ctx, []GraphNode{r1}, func(states []NodeState) (interface{}, error) {
owner := states[0].(ResourceState).owner
if owner != nil {
return nil, fmt.Errorf("r1 is owned by %s instead of r1", owner.ID())
}
return nil, nil
})
fatalErr(t, err)
}
func TestResourceLockTieredConflict(t * testing.T) {
ctx := testContext(t)
r1, err := NewResource(ctx, "Test resource 1", []Resource{})
fatalErr(t, err)
r2, err := NewResource(ctx, "Test resource 2", []Resource{r1})
fatalErr(t, err)
r3, err := NewResource(ctx, "Test resource 3", []Resource{r1})
fatalErr(t, err)
_, err = LockResource(ctx, r2, r2, nil)
fatalErr(t, err)
_, err = LockResource(ctx, r3, r3, nil)
if err == nil {
t.Fatal("Locked r3 which depends on r1 while r2 which depends on r1 is already locked")
}
}