Moved signal function to context

gql_cataclysm
noah metz 2023-07-27 15:49:21 -06:00
parent d729698523
commit 7965f8fbe6
3 changed files with 20 additions and 16 deletions

@ -3,6 +3,7 @@ package graphvent
import ( import (
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
"fmt" "fmt"
"runtime"
) )
//Function to load an extension from bytes //Function to load an extension from bytes
@ -93,6 +94,22 @@ func (ctx *Context) RegisterExtension(ext_type ExtType, load_fn ExtensionLoadFun
return nil return nil
} }
func (ctx *Context) Send(source NodeID, dest NodeID, signal Signal) error {
target, exists := ctx.Nodes[dest]
if exists == false {
return fmt.Errorf("%s does not exist, cannot signal it", dest)
}
select {
case target.MsgChan <- Msg{source, signal}:
default:
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
stack_str := string(buf[:n])
return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", dest, stack_str)
}
return nil
}
// Create a new Context with all the library content added // Create a new Context with all the library content added
func NewContext(db * badger.DB, log Logger) (*Context, error) { func NewContext(db * badger.DB, log Logger) (*Context, error) {
ctx := &Context{ ctx := &Context{

@ -196,6 +196,7 @@ func AuthHandler(ctx *Context, server *Node, gql_ext *GQLExt) func(http.Response
ctx.Log.Logf("gql", "GQL_AUTH_RESP_BAD_LENGTH: %d/%d", wrote, len(ser)) ctx.Log.Logf("gql", "GQL_AUTH_RESP_BAD_LENGTH: %d/%d", wrote, len(ser))
return return
} }
}
} }
func GraphiQLHandler() func(http.ResponseWriter, *http.Request) { func GraphiQLHandler() func(http.ResponseWriter, *http.Request) {

@ -6,7 +6,6 @@ import (
"reflect" "reflect"
"github.com/google/uuid" "github.com/google/uuid"
badger "github.com/dgraph-io/badger/v3" badger "github.com/dgraph-io/badger/v3"
"runtime"
"fmt" "fmt"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
@ -188,19 +187,7 @@ func (node *Node) Process(ctx *Context, source NodeID, signal Signal) {
} }
func (node *Node) Signal(ctx *Context, dest NodeID, signal Signal) error { func (node *Node) Signal(ctx *Context, dest NodeID, signal Signal) error {
target, exists := ctx.Nodes[dest] return ctx.Send(node.ID, dest, signal)
if exists == false {
return fmt.Errorf("%s does not exist, cannot signal it", dest)
}
select {
case target.MsgChan <- Msg{node.ID, signal}:
default:
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
stack_str := string(buf[:n])
return fmt.Errorf("SIGNAL_OVERFLOW: %s - %s", dest, stack_str)
}
return nil
} }
func GetCtx[T Extension, C any](ctx *Context) (C, error) { func GetCtx[T Extension, C any](ctx *Context) (C, error) {
@ -454,7 +441,7 @@ type ExtensionDB struct {
Data []byte Data []byte
} }
// Write multiple nodes to the database in a single transaction // Write a node to the database
func WriteNode(ctx *Context, node *Node) error { func WriteNode(ctx *Context, node *Node) error {
ctx.Log.Logf("db", "DB_WRITE: %s", node.ID) ctx.Log.Logf("db", "DB_WRITE: %s", node.ID)
@ -470,7 +457,6 @@ func WriteNode(ctx *Context, node *Node) error {
}) })
} }
// Recursively load a node from the database.
func LoadNode(ctx * Context, id NodeID) (*Node, error) { func LoadNode(ctx * Context, id NodeID) (*Node, error) {
ctx.Log.Logf("db", "LOOKING_FOR_NODE: %s", id) ctx.Log.Logf("db", "LOOKING_FOR_NODE: %s", id)
node, exists := ctx.Nodes[id] node, exists := ctx.Nodes[id]