From 5cd741b42ee4cb2653790e343b18909d93cc429f Mon Sep 17 00:00:00 2001 From: Noah Metz Date: Thu, 22 Jun 2023 15:50:42 -0600 Subject: [PATCH] intermediary --- event.go | 111 +++++++++++++++++++++++++----------------- go.mod | 16 ++++++ go.sum | 127 ++++++++++++++++++++++++++++++++++++++++++++++++ gql.go | 5 +- graph.go | 131 +++++++++++++++++++++++++++----------------------- graph_test.go | 92 ++++++++++++++++++++++++++--------- 6 files changed, 355 insertions(+), 127 deletions(-) diff --git a/event.go b/event.go index 7b7bbfb..83c9f5d 100644 --- a/event.go +++ b/event.go @@ -7,28 +7,27 @@ import ( "reflect" "sort" "sync" + badger "github.com/dgraph-io/badger/v3" ) // Update the events listeners, and notify the parent to do the same func (event * BaseEvent) PropagateUpdate(signal GraphSignal) { + event.state_lock.RLock() + defer event.state_lock.RUnlock() + state := event.state.(*EventState) + if signal.Downwards() == false { // Child->Parent - event.parent_lock.Lock() - defer event.parent_lock.Unlock() - if event.parent != nil { - SendUpdate(event.parent, signal) + if state.parent != nil { + SendUpdate(state.parent, signal) } - event.rr_lock.Lock() - defer event.rr_lock.Unlock() - for _, resource := range(event.resources) { + for _, resource := range(state.resources) { SendUpdate(resource, signal) } } else { // Parent->Child - event.children_lock.Lock() - defer event.children_lock.Unlock() - for _, child := range(event.children) { + for _, child := range(state.children) { SendUpdate(child, signal) } } @@ -64,6 +63,8 @@ type Event interface { LockChildren() UnlockChildren() InfoType() reflect.Type + LockInfo() + UnlockInfo() ChildInfo(event Event) EventInfo Parent() Event LockParent() @@ -87,15 +88,16 @@ type Event interface { } func (event * BaseEvent) AddResource(resource Resource) error { - event.resources_lock.Lock() - defer event.resources_lock.Unlock() + event.state_lock.Lock() + defer event.state_lock.Unlock() + state := event.state.(*EventState) - _, exists := event.resources[resource.ID()] + _, exists := state.resources[resource.ID()] if exists == true { - return fmt.Errorf("%s is already required for %s, cannot add again", resource.Name(), event.Name()) + return fmt.Errorf("%s is already required for %s, cannot add again", resource.Name(), state.name) } - event.resources[resource.ID()] = resource + state.resources[resource.ID()] = resource return nil } @@ -302,23 +304,35 @@ func FinishEvent(event Event) error { // When starter, this event automatically transitions to completion and unlocks all it's resources(including created) type BaseEvent struct { BaseNode - done_resource Resource - rr_lock sync.Mutex - resources map[string]Resource + resources_lock sync.Mutex - children []Event children_lock sync.Mutex - child_info map[string]EventInfo - child_info_lock sync.Mutex + info_lock sync.Mutex + parent_lock sync.Mutex + Actions map[string]func() (string, error) Handlers map[string]func(GraphSignal) (string, error) - parent Event - parent_lock sync.Mutex - abort chan string + timeout <-chan time.Time timeout_action string } +type EventState struct { + BaseNodeState + children []Event + child_info map[string]EventInfo + resources map[string]Resource + parent Event +} + +func (event * BaseEvent) LockInfo() { + event.info_lock.Lock() +} + +func (event * BaseEvent) UnlockInfo() { + event.info_lock.Unlock() +} + func (event * BaseEvent) Action(action string) (func() (string, error), bool) { action_fn, exists := event.Actions[action] return action_fn, exists @@ -344,23 +358,13 @@ func EventWait(event Event) (func() (string, error)) { } func NewBaseEvent(name string, description string) (BaseEvent) { - done_resource, _ := NewResource("event_done", "signal that event is done", []Resource{}) event := BaseEvent{ - BaseNode: NewBaseNode(name, description, randid()), - parent: nil, - children: []Event{}, - child_info: map[string]EventInfo{}, - done_resource: done_resource, - resources: map[string]Resource{}, + BaseNode: NewBaseNode(randid()), Actions: map[string]func()(string, error){}, Handlers: map[string]func(GraphSignal)(string, error){}, - abort: make(chan string, 1), timeout: nil, timeout_action: "", } - - LockResource(event.done_resource, &event) - return event } @@ -374,9 +378,24 @@ func AddResources(event Event, resources []Resource) error { return nil } -func NewEvent(name string, description string, resources []Resource) (* BaseEvent, error) { +func NewEventState(name string, description string) *EventState{ + return &EventState{ + BaseNodeState: BaseNodeState{ + name: name, + description: description, + delegation_map: map[string]GraphNode{}, + }, + children: []Event{}, + child_info: map[string]EventInfo{}, + resources: map[string]Resource{}, + parent: nil, + } +} + +func NewEvent(db *badger.DB, name string, description string, resources []Resource) (* BaseEvent, error) { event := NewBaseEvent(name, description) event_ptr := &event + event_ptr.state = NewEventState(name, description) err := AddResources(event_ptr, resources) if err != nil { @@ -427,6 +446,8 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve listened_resources: map[string]Resource{}, } + queue.state = NewEventState(name, description) + AddResources(queue, resources) queue.Actions["wait"] = EventWait(queue) @@ -457,12 +478,12 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve } info := queue.ChildInfo(event).(*EventQueueInfo) + event.LockInfo() + defer event.UnlockInfo() if info.state == "queued" { - // Try to lock it err := LockResources(event) // start in new goroutine if err != nil { - //Log.Logf("event", "Failed to lock %s: %s", event.Name(), err) } else { info.state = "running" Log.Logf("event", "EVENT_START: %s", event.Name()) @@ -472,6 +493,8 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve if err != nil { Log.Logf("event", "EVENT_ERROR: %s", err) } + event.LockInfo() + defer event.UnlockInfo() info.state = "done" }(event, info, queue) } @@ -513,17 +536,17 @@ func NewEventQueue(name string, description string, resources []Resource) (* Eve } func (event * BaseEvent) Allowed() []GraphNode { - ret := make([]GraphNode, len(event.children)) - for i, v := range(event.children) { + event.state_lock.RLock() + defer event.state_lock.RUnlock() + state := event.state.(*EventState) + + ret := make([]GraphNode, len(state.children)) + for i, v := range(state.children) { ret[i] = v } return ret } -func (event * BaseEvent) Parent() Event { - return event.parent -} - func (event * BaseEvent) Resources() []Resource { resources := []Resource{} for _, val := range(event.resources) { diff --git a/go.mod b/go.mod index e79643f..b3b3162 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,30 @@ module github.com/mekkanized/graphvent go 1.20 require ( + github.com/cespare/xxhash v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgraph-io/badger/v3 v3.2103.5 // indirect + github.com/dgraph-io/badger/v4 v4.1.0 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gobwas/ws v1.2.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/golang/protobuf v1.3.1 // indirect + github.com/golang/snappy v0.0.3 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/graphql-go/graphql v0.8.1 // indirect github.com/graphql-go/handler v0.2.3 // indirect + github.com/klauspost/compress v1.12.3 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/rs/zerolog v1.29.1 // indirect + go.opencensus.io v0.22.5 // indirect + golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.6.0 // indirect ) diff --git a/go.sum b/go.sum index a6aa4b7..07f7de7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,30 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= +github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= +github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ= +github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg= +github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= @@ -10,6 +35,22 @@ github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk= github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -19,28 +60,114 @@ github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuM github.com/graphql-go/graphql v0.8.1/go.mod h1:nKiHzRM0qopJEwCITUuIsxk9PlVlwIiiI8pnJEhordQ= github.com/graphql-go/handler v0.2.3 h1:CANh8WPnl5M9uA25c2GBhPqJhE53Fg0Iue/fRNla71E= github.com/graphql-go/handler v0.2.3/go.mod h1:leLF6RpV5uZMN1CdImAxuiayrYYhOk33bZciaUGaXeU= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= +github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU= github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI= go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/gql.go b/gql.go index 902b188..1cc31f4 100644 --- a/gql.go +++ b/gql.go @@ -945,7 +945,10 @@ type GQLServer struct { extended_mutations map[string]*graphql.Field } -func NewGQLServer(listen string, extended_types map[reflect.Type]*graphql.Object, extended_queries map[string]*graphql.Field, extended_mutations map[string]*graphql.Field, extended_subscriptions map[string]*graphql.Field) * GQLServer { +type ObjTypeMap map[reflect.Type]*graphql.Object +type FieldMap map[string]*graphql.Field + +func NewGQLServer(listen string, extended_types ObjTypeMap, extended_queries FieldMap, extended_mutations FieldMap, extended_subscriptions FieldMap) * GQLServer { server := &GQLServer{ BaseResource: NewBaseResource("GQL Server", "graphql server for event signals"), listen: listen, diff --git a/graph.go b/graph.go index 5bef206..9c25bf9 100644 --- a/graph.go +++ b/graph.go @@ -7,6 +7,7 @@ import ( "os" "github.com/rs/zerolog" "fmt" + "encoding/json" ) type Logger interface { @@ -131,89 +132,81 @@ func NewSignal(source GraphNode, _type string) BaseSignal { return NewBaseSignal(source, _type, false) } -// GraphNode is the interface common to both DAG nodes and Event tree nodes -type GraphNode interface { +type NodeState interface { Name() string Description() string + DelegationMap() map[string]GraphNode +} + +type BaseNodeState struct { + name string + description string + delegation_map map[string]GraphNode +} + +func (state * BaseNodeState) Name() string { + return state.name +} + +func (state * BaseNodeState) Description() string { + return state.description +} + +func (state * BaseNodeState) DelegationMap() map[string]GraphNode { + return state.delegation_map +} + + +// GraphNode is the interface common to both DAG nodes and Event tree nodes +type GraphNode interface { + State() NodeState ID() string - Allowed() []GraphNode - Delegator(id string) GraphNode - TakeLock(resource Resource) UpdateListeners(update GraphSignal) PropagateUpdate(update GraphSignal) RegisterChannel(listener chan GraphSignal) UnregisterChannel(listener chan GraphSignal) - UpdateChannel() chan GraphSignal SignalChannel() chan GraphSignal } -func NewBaseNode(name string, description string, id string) BaseNode { +func (node * BaseNode) StateLock() *sync.Mutex { + return &node.state_lock +} + +func NewBaseNode(id string) BaseNode { node := BaseNode{ - name: name, - description: description, id: id, signal: make(chan GraphSignal, 512), listeners: map[chan GraphSignal]chan GraphSignal{}, - delegation_map: map[string]GraphNode{}, } - Log.Logf("graph", "NEW_NODE: %s - %s", node.ID(), node.Name()) + Log.Logf("graph", "NEW_NODE: %s", node.ID()) return node } // BaseNode is the most basic implementation of the GraphNode interface // It is used to implement functions common to Events and Resources type BaseNode struct { - name string - description string id string + state NodeState + state_lock sync.RWMutex signal chan GraphSignal listeners_lock sync.Mutex listeners map[chan GraphSignal]chan GraphSignal - delegation_map map[string]GraphNode -} - -func (node * BaseNode) TakeLock(resource Resource) { - _, exists := node.delegation_map[resource.ID()] - if exists == true { - panic("Trying to take a lock we already have") - } - - node.delegation_map[resource.ID()] = resource.Owner() -} - -func (node * BaseNode) Allowed() []GraphNode { - return []GraphNode{} -} - -func (node * BaseNode) Delegator(id string) GraphNode { - last_owner, exists := node.delegation_map[id] - if exists == false { - panic("Trying to delegate a lock we don't own") - } - - delete(node.delegation_map, id) - return last_owner } func (node * BaseNode) SignalChannel() chan GraphSignal { return node.signal } -func (node * BaseNode) Name() string { - return node.name -} - -func (node * BaseNode) Description() string { - return node.description +func (node * BaseNode) State() NodeState { + return node.state } func (node * BaseNode) ID() string { return node.id } -// Create a new listener channel for the node, add it to the nodes listener list, and return the new channel -const listener_buffer = 1000 -func (node * BaseNode) UpdateChannel() chan GraphSignal { +const listener_buffer = 100 +func GetUpdateChannel(node * BaseNode) chan GraphSignal { new_listener := make(chan GraphSignal, listener_buffer) node.RegisterChannel(new_listener) return new_listener @@ -239,18 +232,20 @@ func (node * BaseNode) UnregisterChannel(listener chan GraphSignal) { node.listeners_lock.Unlock() } -// Send the update to listener channels -func (node * BaseNode) UpdateListeners(update GraphSignal) { - node.listeners_lock.Lock() +func (node * BaseNode) PropagateUpdate(update GraphSignal) { +} - closed := []chan GraphSignal{} +func (node * BaseNode) UpdateListeners(update GraphSignal) { + node.ListenersLock.Lock() + defer node.ListenersLock.Unlock() + closed := []chan GraphSignal - for _, listener := range node.listeners { - Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.Name(), listener) + for _, listener := range node.Listeners() { + Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener) select { - case listener <- update: + case listener <- signal: default: - Log.Logf("listeners", "CLOSED_LISTENER: %s: %p", node.Name(), listener) + Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener) go func(node GraphNode, listener chan GraphSignal) { listener <- NewSignal(node, "listener_closed") close(listener) @@ -262,11 +257,6 @@ func (node * BaseNode) UpdateListeners(update GraphSignal) { for _, listener := range(closed) { delete(node.listeners, listener) } - - node.listeners_lock.Unlock() -} - -func (node * BaseNode) PropagateUpdate(signal GraphSignal) { } func SendUpdate(node GraphNode, signal GraphSignal) { @@ -275,7 +265,28 @@ func SendUpdate(node GraphNode, signal GraphSignal) { node_name = node.Name() } Log.Logf("update", "UPDATE %s <- %s: %+v", node_name, signal.Source(), signal) - node.UpdateListeners(signal) + node.ListenersLock.Lock() + defer node.ListenersLock.Unlock() + closed := []chan GraphSignal + + for _, listener := range node.Listeners() { + Log.Logf("listeners", "UPDATE_LISTENER %s: %p", node.ID(), listener) + select { + case listener <- signal: + default: + Log.Logf("listeners", "CLOSED_LISTENER %s: %p", node.ID(), listener) + go func(node GraphNode, listener chan GraphSignal) { + listener <- NewSignal(node, "listener_closed") + close(listener) + }(node, listener) + closed = append(closed, listener) + } + } + + for _, listener := range(closed) { + delete(node.listeners, listener) + } + node.PropagateUpdate(signal) } diff --git a/graph_test.go b/graph_test.go index 2bf65f9..35bfc2c 100644 --- a/graph_test.go +++ b/graph_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "runtime/pprof" + badger "github.com/dgraph-io/badger/v3" ) type GraphTester testing.T @@ -54,12 +55,16 @@ func (t * GraphTester) CheckForNone(listener chan GraphSignal, str string) { } func TestNewEventWithResource(t *testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger1")) + if err != nil { + t.Fatal(err) + } name := "Test Resource" description := "A resource for testing" children := []Resource{} test_resource, _ := NewResource(name, description, children) - root_event, err := NewEvent("root_event", "", []Resource{test_resource}) + root_event, err := NewEvent(db, "root_event", "", []Resource{test_resource}) if err != nil { t.Fatal(err) } @@ -75,8 +80,12 @@ func TestNewEventWithResource(t *testing.T) { } func TestDoubleResourceAdd(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger2")) + if err != nil { + t.Fatal(err) + } test_resource, _ := NewResource("", "", []Resource{}) - _, err := NewEvent("", "", []Resource{test_resource, test_resource}) + _, err = NewEvent(db, "", "", []Resource{test_resource, test_resource}) if err == nil { t.Fatal("NewEvent didn't return an error") @@ -84,12 +93,17 @@ func TestDoubleResourceAdd(t * testing.T) { } func TestTieredResource(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger3")) + if err != nil { + t.Fatal(err) + } + r1, _ := NewResource("r1", "", []Resource{}) r2, err := NewResource("r2", "", []Resource{r1}) if err != nil { t.Fatal(err) } - _, err = NewEvent("", "", []Resource{r2}) + _, err = NewEvent(db, "", "", []Resource{r2}) if err != nil { t.Fatal("Failed to create event with tiered resources") @@ -97,6 +111,11 @@ func TestTieredResource(t * testing.T) { } func TestResourceUpdate(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger4")) + if err != nil { + t.Fatal(err) + } + r1, err := NewResource("r1", "", []Resource{}) if err != nil { t.Fatal(err) @@ -114,7 +133,7 @@ func TestResourceUpdate(t * testing.T) { t.Fatal(err) } - _, err = NewEvent("", "", []Resource{r3, r4}) + _, err = NewEvent(db, "", "", []Resource{r3, r4}) if err != nil { t.Fatal("Failed to add initial tiered resources for test") } @@ -147,16 +166,20 @@ func TestResourceUpdate(t * testing.T) { } func TestAddEvent(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger5")) + if err != nil { + t.Fatal(err) + } r1, _ := NewResource("r1", "", []Resource{}) r2, _ := NewResource("r2", "", []Resource{r1}) - root_event, _ := NewEvent("", "", []Resource{r2}) + root_event, _ := NewEvent(db, "", "", []Resource{r2}) name := "Test Event" description := "A test event" resources := []Resource{r2} - new_event, _ := NewEvent(name, description, resources) + new_event, _ := NewEvent(db, name, description, resources) - err := LinkEvent(root_event, new_event, nil) + err = LinkEvent(root_event, new_event, nil) if err != nil { t.Fatalf("Failed to add new_event to root_event: %s", err) } @@ -179,6 +202,10 @@ func TestAddEvent(t * testing.T) { } func TestLockResource(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger6")) + if err != nil { + t.Fatal(err) + } r1, err := NewResource("r1", "", []Resource{}) if err != nil { t.Fatal(err) @@ -195,11 +222,11 @@ func TestLockResource(t * testing.T) { if err != nil { t.Fatal(err) } - root_event, err := NewEvent("", "", []Resource{}) + root_event, err := NewEvent(db, "", "", []Resource{}) if err != nil { t.Fatal(err) } - test_event, err := NewEvent("", "", []Resource{}) + test_event, err := NewEvent(db, "", "", []Resource{}) if err != nil { t.Fatal(err) } @@ -260,11 +287,15 @@ func TestLockResource(t * testing.T) { } func TestAddToEventQueue(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger7")) + if err != nil { + t.Fatal(err) + } queue, _ := NewEventQueue("q", "", []Resource{}) - event_1, _ := NewEvent("1", "", []Resource{}) - event_2, _ := NewEvent("2", "", []Resource{}) + event_1, _ := NewEvent(db, "1", "", []Resource{}) + event_2, _ := NewEvent(db, "2", "", []Resource{}) - err := LinkEvent(queue, event_1, nil) + err = LinkEvent(queue, event_1, nil) if err == nil { t.Fatal("suceeded in added nil info to queue") } @@ -281,7 +312,11 @@ func TestAddToEventQueue(t * testing.T) { } func TestStartBaseEvent(t * testing.T) { - event_1, _ := NewEvent("TestStartBaseEvent event_1", "", []Resource{}) + db, err := badger.Open(badger.DefaultOptions("/tmp/badger8")) + if err != nil { + t.Fatal(err) + } + event_1, _ := NewEvent(db, "TestStartBaseEvent event_1", "", []Resource{}) r := event_1.DoneResource() e_l := event_1.UpdateChannel() @@ -293,7 +328,7 @@ func TestStartBaseEvent(t * testing.T) { t.Fatal("r is not owned by event_1") } - err := LockResources(event_1) + err = LockResources(event_1) if err != nil { t.Fatal(err) } @@ -342,10 +377,14 @@ func TestAbortEventQueue(t * testing.T) { } func TestDelegateLock(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger")) + if err != nil { + t.Fatal(err) + } test_resource, _ := NewResource("test_resource", "", []Resource{}) root_event, _ := NewEventQueue("root_event", "", []Resource{test_resource}) - test_event, _ := NewEvent("test_event", "", []Resource{test_resource}) - err := LinkEvent(root_event, test_event, NewEventQueueInfo(1)) + test_event, _ := NewEvent(db, "test_event", "", []Resource{test_resource}) + err = LinkEvent(root_event, test_event, NewEventQueueInfo(1)) if err != nil { t.Fatal(err) } @@ -373,16 +412,24 @@ func TestDelegateLock(t * testing.T) { } func TestStartWithoutLocking(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger9")) + if err != nil { + t.Fatal(err) + } test_resource, _ := NewResource("test_resource", "", []Resource{}) - root_event, _ := NewEvent("root_event", "", []Resource{test_resource}) + root_event, _ := NewEvent(db, "root_event", "", []Resource{test_resource}) - err := RunEvent(root_event) + err = RunEvent(root_event) if err == nil { t.Fatal("Event ran without error without locking resources") } } func TestStartEventQueue(t * testing.T) { + db, err := badger.Open(badger.DefaultOptions("/tmp/badger10")) + if err != nil { + t.Fatal(err) + } root_event, _ := NewEventQueue("root_event", "", []Resource{}) r := root_event.DoneResource() rel := root_event.UpdateChannel(); @@ -390,17 +437,17 @@ func TestStartEventQueue(t * testing.T) { res_2, _ := NewResource("test_resource_2", "", []Resource{}) - e1, _ := NewEvent("e1", "", []Resource{res_1, res_2}) + e1, _ := NewEvent(db, "e1", "", []Resource{res_1, res_2}) e1_l := e1.UpdateChannel() e1_r := e1.DoneResource() e1_info := NewEventQueueInfo(1) - err := LinkEvent(root_event, e1, e1_info) + err = LinkEvent(root_event, e1, e1_info) if err != nil { t.Fatal("Failed to add e1 to root_event") } (*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e1") - e2, _ := NewEvent("e2", "", []Resource{res_1}) + e2, _ := NewEvent(db, "e2", "", []Resource{res_1}) e2_l := e2.UpdateChannel() e2_r := e2.DoneResource() e2_info := NewEventQueueInfo(2) @@ -410,7 +457,7 @@ func TestStartEventQueue(t * testing.T) { } (*GraphTester)(t).WaitForValue(rel, "child_added", root_event, time.Second, "No update on root_event after adding e2") - e3, _ := NewEvent("e3", "", []Resource{res_2}) + e3, _ := NewEvent(db, "e3", "", []Resource{res_2}) e3_l := e3.UpdateChannel() e3_r := e3.DoneResource() e3_info := NewEventQueueInfo(3) @@ -465,3 +512,4 @@ func TestStartEventQueue(t * testing.T) { t.Fatal("e3 was not completed") } } +