Moved WaitInfo to node.go, added *Node methods for handling WaitMap's

gql_cataclysm
noah metz 2023-11-03 21:41:06 -06:00
parent 57156251cd
commit f5a08bbc48
4 changed files with 52 additions and 3 deletions

@ -312,7 +312,7 @@ func (ctx *Context) getNode(id NodeID) (*Node, error) {
return target, nil
}
// Route a Signal to dest. Currently only local context routing is supported
// Route Messages to dest. Currently only local context routing is supported
func (ctx *Context) Send(messages Messages) error {
for _, msg := range(messages) {
if msg.Dest == ZeroID {
@ -434,6 +434,16 @@ func NewContext(db * badger.DB, log Logger) (*Context, error) {
return nil, err
}
wait_info_type := reflect.TypeOf(WaitInfo{})
wait_info_info, err := GetStructInfo(ctx, wait_info_type)
if err != nil {
return nil, err
}
err = ctx.RegisterType(wait_info_type, WaitInfoType, nil, SerializeStruct(wait_info_info), nil, DeserializeStruct(wait_info_info))
if err != nil {
return nil, err
}
err = ctx.RegisterType(reflect.TypeOf(time.Duration(0)), DurationType, nil, nil, nil, DeserializeUint64[time.Duration])
if err != nil {
return nil, err

@ -69,6 +69,13 @@ func (signal EventControlSignal) Permission() Tree {
}
}
func (ext *EventExt) UpdateState(node *Node, state string) {
if ext.State != state {
ext.State = state
node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, ext.State, time.Now()))
}
}
func (ext *EventExt) Process(ctx *Context, node *Node, source NodeID, signal Signal) (Messages, Changes) {
var messages Messages = nil
var changes Changes = nil
@ -125,9 +132,8 @@ func (ext *TestEventExt) Process(ctx *Context, node *Node, source NodeID, signal
if exists == true {
if event_ext.State == info.from_state {
ctx.Log.Logf("event", "%s %s->%s", node.ID, info.from_state, info.to_state)
event_ext.State = info.to_state
messages = messages.Add(ctx, source, node, nil, NewSuccessSignal(sig.Id))
node.QueueSignal(time.Now(), NewEventStateSignal(node.ID, event_ext.State, time.Now()))
event_ext.UpdateState(node, info.to_state)
if event_ext.State == "running" {
node.QueueSignal(time.Now().Add(ext.Length), NewEventControlSignal("finish"))
}

@ -146,6 +146,38 @@ func (node *Node) Allows(ctx *Context, principal_id NodeID, action Tree)(map[uui
return nil, Deny
}
type WaitInfo struct {
NodeID NodeID `gv:"node"`
Timeout uuid.UUID `gv:"timeout"`
}
type WaitMap map[uuid.UUID]WaitInfo
// Removes a signal from the wait_map and dequeue the associated timeout signal
// Returns the data, and whether or not the ID was found in the wait_map
func (node *Node) ProcessResponse(wait_map WaitMap, response ResponseSignal) (WaitInfo, bool) {
wait_info, is_processed := wait_map[response.ResponseID()]
if is_processed == true {
delete(wait_map, response.ResponseID())
if response.ID() != wait_info.Timeout {
node.DequeueSignal(wait_info.Timeout)
}
return wait_info, true
}
return WaitInfo{}, false
}
// Creates a timeout signal for signal, queues it for the node at the timeout, and adds the info to the wait map
func (node *Node) QueueTimeout(dest NodeID, signal Signal, timeout time.Duration, wait_map WaitMap) {
timeout_signal := NewTimeoutSignal(signal.ID())
node.QueueSignal(time.Now().Add(timeout), timeout_signal)
wait_map[signal.ID()] = WaitInfo{
NodeID: dest,
Timeout: timeout_signal.Id,
}
}
func (node *Node) QueueSignal(time time.Time, signal Signal) {
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time})
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)

@ -257,6 +257,7 @@ var (
ReqStateType = NewSerializedType("REQ_STATE")
ReqInfoType = NewSerializedType("REQ_INFO")
WaitInfoType = NewSerializedType("WAIT_INFO")
SignalDirectionType = NewSerializedType("SIGNAL_DIRECTION")
NodeStructType = NewSerializedType("NODE_STRUCT")
QueuedSignalType = NewSerializedType("QUEUED_SIGNAL")