graphvent/event.go

514 lines
13 KiB
Go

package main
import (
"fmt"
"time"
"errors"
"reflect"
2023-05-29 23:54:52 -06:00
"sort"
"sync"
)
// Update the events listeners, and notify the parent to do the same
func (event * BaseEvent) update(signal GraphSignal) {
event.signal <- signal
new_signal := signal.Trace(event.ID())
if event.parent != nil {
SendUpdate(event.parent, new_signal)
}
for _, resource := range(event.RequiredResources()) {
source_id := ""
if signal.Source() != nil {
source_id = signal.Source().ID()
}
if source_id != resource.ID() {
SendUpdate(resource, new_signal)
}
}
}
type EventInfo interface {
}
type BaseEventInfo interface {
EventInfo
}
type EventQueueInfo struct {
EventInfo
priority int
state string
}
func NewEventQueueInfo(priority int) * EventQueueInfo {
info := &EventQueueInfo{
priority: priority,
state: "queued",
}
return info
}
// Event is the interface that event tree nodes must implement
type Event interface {
GraphNode
Children() []Event
LockChildren()
UnlockChildren()
InfoType() reflect.Type
ChildInfo(event Event) EventInfo
Parent() Event
LockParent()
UnlockParent()
Action(action string) (func()(string, error), bool)
2023-06-03 01:38:35 -06:00
Handler(signal_type string) (func(GraphSignal) (string, error), bool)
RequiredResources() []Resource
2023-05-29 23:54:52 -06:00
DoneResource() Resource
SetTimeout(end_time time.Time, action string)
Timeout() <-chan time.Time
TimeoutAction() string
Signal() chan GraphSignal
finish() error
addChild(child Event, info EventInfo)
setParent(parent Event)
}
func (event * BaseEvent) Signal() chan GraphSignal {
return event.signal
}
func (event * BaseEvent) TimeoutAction() string {
return event.timeout_action
}
func (event * BaseEvent) Timeout() <-chan time.Time {
return event.timeout
}
func (event * BaseEvent) SetTimeout(end_time time.Time, action string) {
event.timeout_action = action
event.timeout = time.After(time.Until(end_time))
}
2023-06-03 01:38:35 -06:00
func (event * BaseEvent) Handler(signal_type string) (func(GraphSignal)(string, error), bool) {
handler, exists := event.handlers[signal_type]
return handler, exists
}
func FindChild(event Event, id string) Event {
if id == event.ID() {
return event
}
for _, child := range event.Children() {
result := FindChild(child, id)
if result != nil {
return result
}
}
return nil
}
func CheckInfoType(event Event, info EventInfo) bool {
if event.InfoType() == nil || info == nil {
if event.InfoType() == nil && info == nil {
return true
} else {
return false
}
}
return event.InfoType() == reflect.TypeOf(info)
2023-05-29 23:54:52 -06:00
}
func AddChild(event Event, child Event, info EventInfo) error {
if CheckInfoType(event, info) == false {
return errors.New("AddChild got wrong type")
}
event.LockParent()
child.LockParent()
if child.Parent() != nil {
child.UnlockParent()
event.UnlockParent()
return errors.New(fmt.Sprintf("Parent already registered: %s->%s already %s", child.Name(), event.Name(), event.Parent().Name()))
}
event.LockChildren()
for _, c := range(event.Children()) {
if c.ID() == child.ID() {
event.UnlockChildren()
child.UnlockParent()
event.UnlockParent()
return errors.New("Child already in event")
}
}
// After all the checks are done, update the state of child + parent, then unlock and update
child.setParent(event)
event.addChild(child, info)
event.UnlockChildren()
child.UnlockParent()
event.UnlockParent()
2023-06-03 01:38:35 -06:00
update := NewSignal(event, "child_added")
update.description = child.Name()
SendUpdate(event, NewSignal(event, "child_added"))
2023-05-29 23:54:52 -06:00
return nil
}
func RunEvent(event Event) error {
log.Logf("event", "EVENT_RUN: %s", event.Name())
2023-06-03 01:38:35 -06:00
go SendUpdate(event, NewSignal(event, "event_start"))
next_action := "start"
var err error = nil
for next_action != "" {
action, exists := event.Action(next_action)
if exists == false {
error_str := fmt.Sprintf("%s is not a valid action", next_action)
return errors.New(error_str)
}
log.Logf("event", "EVENT_ACTION: %s - %s", event.Name(), next_action)
next_action, err = action()
if err != nil {
return err
}
}
log.Logf("event", "EVENT_RUN_DONE: %s", event.Name())
return nil
}
func AbortEvent(event Event) error {
signal := NewSignal(event, "abort")
SendUpdate(event, signal)
event.LockChildren()
for _, child := range(event.Children()) {
AbortEvent(child)
}
event.UnlockChildren()
return nil
}
func LockResources(event Event) error {
2023-05-29 23:54:52 -06:00
locked_resources := []Resource{}
var lock_err error = nil
2023-05-29 23:54:52 -06:00
for _, resource := range(event.RequiredResources()) {
err := LockResource(resource, event)
2023-05-29 23:54:52 -06:00
if err != nil {
lock_err = err
break
2023-05-29 23:54:52 -06:00
}
locked_resources = append(locked_resources, resource)
2023-05-29 23:54:52 -06:00
}
if lock_err != nil {
2023-05-29 23:54:52 -06:00
for _, resource := range(locked_resources) {
UnlockResource(resource, event)
2023-05-29 23:54:52 -06:00
}
return lock_err
}
for _, resource := range(locked_resources) {
NotifyResourceLocked(resource)
2023-05-29 23:54:52 -06:00
}
2023-05-29 23:54:52 -06:00
return nil
}
func FinishEvent(event Event) error {
// TODO make more 'safe' like LockResources, or make UnlockResource not return errors
log.Logf("event", "EVENT_FINISH: %s", event.Name())
for _, resource := range(event.RequiredResources()) {
err := UnlockResource(resource, event)
if err != nil {
panic(err)
}
NotifyResourceUnlocked(resource)
}
err := UnlockResource(event.DoneResource(), event)
if err != nil {
return err
}
NotifyResourceUnlocked(event.DoneResource())
err = event.finish()
if err != nil {
return err
}
SendUpdate(event, NewSignal(event, "event_done"))
return nil
2023-05-29 23:54:52 -06:00
}
// BaseEvent is the most basic event that can exist in the event tree.
// On start it automatically transitions to completion.
// It can optionally require events, which will all need to be locked to start it
// It can optionally create resources, which will be locked by default and unlocked on completion
// This node by itself doesn't implement any special behaviours for children, so they will be ignored.
// When starter, this event automatically transitions to completion and unlocks all it's resources(including created)
type BaseEvent struct {
BaseNode
done_resource Resource
required_resources []Resource
children []Event
child_info map[string]EventInfo
child_lock sync.Mutex
actions map[string]func() (string, error)
2023-06-03 01:38:35 -06:00
handlers map[string]func(GraphSignal) (string, error)
parent Event
parent_lock sync.Mutex
abort chan string
timeout <-chan time.Time
timeout_action string
}
func (event * BaseEvent) Action(action string) (func() (string, error), bool) {
action_fn, exists := event.actions[action]
return action_fn, exists
}
func EventWait(event Event) (func() (string, error)) {
return func() (string, error) {
log.Logf("event", "EVENT_WAIT: %s TIMEOUT: %+v", event.Name(), event.Timeout())
select {
case signal := <- event.Signal():
if signal.Source() != nil {
log.Logf("event", "EVENT_SIGNAL: %s %s %s -> %+v", event.Name(), signal.Last(), signal.Source().Name(), signal)
} else {
log.Logf("event", "EVENT_SIGNAL: %s %s nil -> %+v", event.Name(), signal.Last(), signal)
}
if signal.Type() == "abort" {
return "", errors.New("State machine aborted by signal")
} else {
signal_fn, exists := event.Handler(signal.Type())
if exists == true {
log.Logf("event", "EVENT_HANDLER: %s - %s", event.Name(), signal.Type())
return signal_fn(signal)
}
}
return "wait", nil
case <- event.Timeout():
log.Logf("event", "EVENT_TIMEOUT %s - NEXT_STATE: %s", event.Name(), event.TimeoutAction())
return event.TimeoutAction(), nil
}
}
}
func NewBaseEvent(name string, description string, required_resources []Resource) (BaseEvent) {
done_resource := NewResource("event_done", "signal that event is done", []Resource{})
event := BaseEvent{
2023-06-03 18:56:14 -06:00
BaseNode: NewBaseNode(name, description, randid()),
parent: nil,
children: []Event{},
child_info: map[string]EventInfo{},
2023-05-29 23:54:52 -06:00
done_resource: done_resource,
required_resources: required_resources,
actions: map[string]func()(string, error){},
2023-06-03 01:38:35 -06:00
handlers: map[string]func(GraphSignal)(string, error){},
abort: make(chan string, 1),
timeout: nil,
timeout_action: "",
}
LockResource(event.done_resource, &event)
return event
}
func NewEvent(name string, description string, required_resources []Resource) (* BaseEvent) {
event := NewBaseEvent(name, description, required_resources)
event_ptr := &event
event_ptr.actions["wait"] = EventWait(event_ptr)
event_ptr.actions["start"] = func() (string, error) {
2023-05-29 23:54:52 -06:00
return "", nil
}
return event_ptr
}
func (event * BaseEvent) finish() error {
return nil
}
func (event * BaseEvent) InfoType() reflect.Type {
return nil
}
// EventQueue is a basic event that can have children.
// On start, it attempts to start it's children from the highest 'priority'
type EventQueue struct {
BaseEvent
listened_resources map[string]Resource
queue_lock sync.Mutex
}
func (queue * EventQueue) finish() error {
for _, resource := range(queue.listened_resources) {
resource.UnregisterChannel(queue.signal)
}
return nil
}
func (queue * EventQueue) InfoType() reflect.Type {
return reflect.TypeOf((*EventQueueInfo)(nil))
}
2023-05-29 23:54:52 -06:00
func NewEventQueue(name string, description string, required_resources []Resource) (* EventQueue) {
queue := &EventQueue{
BaseEvent: NewBaseEvent(name, description, []Resource{}),
listened_resources: map[string]Resource{},
}
queue.actions["wait"] = EventWait(queue)
queue.actions["start"] = func() (string, error) {
2023-05-29 23:54:52 -06:00
return "queue_event", nil
}
queue.actions["queue_event"] = func() (string, error) {
// Copy the events to sort the list
queue.LockChildren()
2023-05-29 23:54:52 -06:00
copied_events := make([]Event, len(queue.Children()))
copy(copied_events, queue.Children())
less := func(i int, j int) bool {
info_i := queue.ChildInfo(copied_events[i]).(*EventQueueInfo)
info_j := queue.ChildInfo(copied_events[j]).(*EventQueueInfo)
return info_i.priority < info_j.priority
}
sort.SliceStable(copied_events, less)
wait := false
needed_resources := map[string]Resource{}
2023-05-29 23:54:52 -06:00
for _, event := range(copied_events) {
// make sure all the required resources are registered to update the event
for _, resource := range(event.RequiredResources()) {
needed_resources[resource.ID()] = resource
}
2023-05-29 23:54:52 -06:00
info := queue.ChildInfo(event).(*EventQueueInfo)
if info.state == "queued" {
wait = true
// Try to lock it
err := LockResources(event)
2023-05-29 23:54:52 -06:00
// start in new goroutine
if err != nil {
//log.Logf("event", "Failed to lock %s: %s", event.Name(), err)
2023-05-29 23:54:52 -06:00
} else {
info.state = "running"
log.Logf("event", "EVENT_START: %s", event.Name())
go func(event Event, info * EventQueueInfo, queue Event) {
log.Logf("event", "EVENT_GOROUTINE: %s", event.Name())
err := RunEvent(event)
if err != nil {
log.Logf("event", "EVENT_ERROR: %s", err)
}
2023-05-29 23:54:52 -06:00
info.state = "done"
FinishEvent(event)
2023-05-29 23:54:52 -06:00
}(event, info, queue)
}
} else if info.state == "running" {
wait = true
}
}
for _, resource := range(needed_resources) {
2023-06-03 01:38:35 -06:00
_, exists := queue.listened_resources[resource.ID()]
if exists == false {
log.Logf("event", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name())
2023-06-03 01:38:35 -06:00
queue.listened_resources[resource.ID()] = resource
resource.RegisterChannel(queue.signal)
}
}
queue.UnlockChildren()
2023-05-29 23:54:52 -06:00
if wait == true {
return "wait", nil
} else {
return "", nil
}
}
queue.handlers["resource_connected"] = func(signal GraphSignal) (string, error) {
2023-06-03 01:38:35 -06:00
return "queue_event", nil
}
queue.handlers["child_added"] = func(signal GraphSignal) (string, error) {
2023-05-29 23:54:52 -06:00
return "queue_event", nil
}
2023-06-03 01:38:35 -06:00
queue.handlers["lock_changed"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
2023-06-03 01:38:35 -06:00
queue.handlers["event_done"] = func(signal GraphSignal) (string, error) {
return "queue_event", nil
}
2023-05-29 23:54:52 -06:00
return queue
}
func (event * BaseEvent) Parent() Event {
return event.parent
}
func (event * BaseEvent) RequiredResources() []Resource {
return event.required_resources
}
2023-05-29 23:54:52 -06:00
func (event * BaseEvent) DoneResource() Resource {
return event.done_resource
}
func (event * BaseEvent) Children() []Event {
return event.children
}
func (event * BaseEvent) ChildInfo(idx Event) EventInfo {
val, ok := event.child_info[idx.ID()]
if ok == false {
return nil
}
return val
}
func (event * BaseEvent) LockChildren() {
event.child_lock.Lock()
}
func (event * BaseEvent) UnlockChildren() {
event.child_lock.Unlock()
}
func (event * BaseEvent) LockParent() {
event.parent_lock.Lock()
}
func (event * BaseEvent) UnlockParent() {
event.parent_lock.Unlock()
}
func (event * BaseEvent) setParent(parent Event) {
event.parent = parent
}
func (event * BaseEvent) addChild(child Event, info EventInfo) {
event.children = append(event.children, child)
event.child_info[child.ID()] = info
}