|
|
|
@ -41,25 +41,6 @@ func (thread * BaseThread) PropagateUpdate(ctx * GraphContext, signal GraphSigna
|
|
|
|
|
thread.signal <- signal
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*func FindLockable(root Thread, id string) Lockable {
|
|
|
|
|
if root == nil || id == ""{
|
|
|
|
|
panic("invalid input")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, resource := range(root.Lockables()) {
|
|
|
|
|
if resource.ID() == id {
|
|
|
|
|
return resource
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, child := range(root.Children()) {
|
|
|
|
|
resource := FindLockable(child, id)
|
|
|
|
|
if resource != nil {
|
|
|
|
|
return resource
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
type ThreadInfo interface {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -423,142 +404,3 @@ func NewThread(ctx * GraphContext, name string, requirements []Lockable, actions
|
|
|
|
|
return thread_ptr, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
// ThreadQueue is a basic thread that can have children.
|
|
|
|
|
// On start, it attempts to start it's children from the highest 'priority'
|
|
|
|
|
type ThreadQueueInfo struct {
|
|
|
|
|
priority int
|
|
|
|
|
state string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (info * BaseThreadQueueInfo) MarshalJSON() ([]byte, error) {
|
|
|
|
|
return json.Marshal(&struct{
|
|
|
|
|
Priority int `json:"priority"`
|
|
|
|
|
State string `json:"state"`
|
|
|
|
|
}{
|
|
|
|
|
Priority: info.priority,
|
|
|
|
|
State: info.state,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewThreadQueueInfo(priority int) * BaseThreadQueueInfo {
|
|
|
|
|
info := &ThreadQueueInfo{
|
|
|
|
|
priority: priority,
|
|
|
|
|
state: "queued",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return info
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ThreadQueue struct {
|
|
|
|
|
Thread
|
|
|
|
|
listened_resources map[string]Lockable
|
|
|
|
|
queue_lock sync.Mutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (queue * BaseThreadQueue) Unlock() error {
|
|
|
|
|
for _, resource := range(queue.listened_resources) {
|
|
|
|
|
resource.UnregisterChannel(queue.signal)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (queue * BaseThreadQueue) InfoType() reflect.Type {
|
|
|
|
|
return reflect.TypeOf((*ThreadQueueInfo)(nil))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewThreadQueue(name string, description string, resources []Lockable) (* BaseThreadQueue, error) {
|
|
|
|
|
queue := &ThreadQueue{
|
|
|
|
|
Thread: NewThread(name, description),
|
|
|
|
|
listened_resources: map[string]Lockable{},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.state = NewBaseThreadState(name, description)
|
|
|
|
|
|
|
|
|
|
AddLockables(queue, resources)
|
|
|
|
|
|
|
|
|
|
queue.Actions["wait"] = ThreadWait(queue)
|
|
|
|
|
queue.Handlers["abort"] = ThreadAbort(queue)
|
|
|
|
|
queue.Handlers["cancel"] = ThreadCancel(queue)
|
|
|
|
|
|
|
|
|
|
queue.Actions["start"] = func() (string, error) {
|
|
|
|
|
return "queue_thread", nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.Actions["queue_thread"] = func() (string, error) {
|
|
|
|
|
// Copy the threads to sort the list
|
|
|
|
|
queue.LockChildren()
|
|
|
|
|
copied_threads := make([]Thread, len(queue.Children()))
|
|
|
|
|
copy(copied_threads, queue.Children())
|
|
|
|
|
less := func(i int, j int) bool {
|
|
|
|
|
info_i := queue.ChildInfo(copied_threads[i]).(*ThreadQueueInfo)
|
|
|
|
|
info_j := queue.ChildInfo(copied_threads[j]).(*ThreadQueueInfo)
|
|
|
|
|
return info_i.priority < info_j.priority
|
|
|
|
|
}
|
|
|
|
|
sort.SliceStable(copied_threads, less)
|
|
|
|
|
|
|
|
|
|
needed_resources := map[string]Lockable{}
|
|
|
|
|
for _, thread := range(copied_threads) {
|
|
|
|
|
// make sure all the required resources are registered to update the thread
|
|
|
|
|
for _, resource := range(thread.Lockables()) {
|
|
|
|
|
needed_resources[resource.ID()] = resource
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info := queue.ChildInfo(thread).(*ThreadQueueInfo)
|
|
|
|
|
thread.LockInfo()
|
|
|
|
|
defer thread.UnlockInfo()
|
|
|
|
|
if info.state == "queued" {
|
|
|
|
|
err := LockLockable(thread)
|
|
|
|
|
// start in new goroutine
|
|
|
|
|
if err != nil {
|
|
|
|
|
} else {
|
|
|
|
|
info.state = "running"
|
|
|
|
|
Log.Logf("thread", "EVENT_START: %s", thread.Name())
|
|
|
|
|
go func(thread Thread, info * BaseThreadQueueInfo, queue Thread) {
|
|
|
|
|
Log.Logf("thread", "EVENT_GOROUTINE: %s", thread.Name())
|
|
|
|
|
err := RunThread(thread)
|
|
|
|
|
if err != nil {
|
|
|
|
|
Log.Logf("thread", "EVENT_ERROR: %s", err)
|
|
|
|
|
}
|
|
|
|
|
thread.LockInfo()
|
|
|
|
|
defer thread.UnlockInfo()
|
|
|
|
|
info.state = "done"
|
|
|
|
|
}(thread, info, queue)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for _, resource := range(needed_resources) {
|
|
|
|
|
_, exists := queue.listened_resources[resource.ID()]
|
|
|
|
|
if exists == false {
|
|
|
|
|
Log.Logf("thread", "REGISTER_RESOURCE: %s - %s", queue.Name(), resource.Name())
|
|
|
|
|
queue.listened_resources[resource.ID()] = resource
|
|
|
|
|
resource.RegisterChannel(queue.signal)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.UnlockChildren()
|
|
|
|
|
|
|
|
|
|
return "wait", nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.Handlers["resource_connected"] = func(signal GraphSignal) (string, error) {
|
|
|
|
|
return "queue_thread", nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.Handlers["child_added"] = func(signal GraphSignal) (string, error) {
|
|
|
|
|
return "queue_thread", nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.Handlers["lock_changed"] = func(signal GraphSignal) (string, error) {
|
|
|
|
|
return "queue_thread", nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.Handlers["thread_done"] = func(signal GraphSignal) (string, error) {
|
|
|
|
|
return "queue_thread", nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return queue, nil
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|