Fixed json parsing, not sure if it broke or just wasn't running before

gql_cataclysm v0.2.4
noah metz 2023-07-28 00:04:18 -06:00
parent d40e561728
commit fb7e6d02f4
6 changed files with 116 additions and 128 deletions

@ -9,21 +9,24 @@ import (
"encoding/binary" "encoding/binary"
) )
// A Type can be Hashed by Hash
type Type interface { type Type interface {
String() string String() string
Prefix() string Prefix() string
} }
// Hashed a Type to a uint64
func Hash(t Type) uint64 { func Hash(t Type) uint64 {
hash := sha512.Sum512([]byte(fmt.Sprintf("%s%s", t.Prefix(), t.String()))) hash := sha512.Sum512([]byte(fmt.Sprintf("%s%s", t.Prefix(), t.String())))
return binary.BigEndian.Uint64(hash[(len(hash)-9):(len(hash)-1)]) return binary.BigEndian.Uint64(hash[(len(hash)-9):(len(hash)-1)])
} }
// NodeType identifies the 'class' of a node
type NodeType string type NodeType string
func (node NodeType) Prefix() string { return "NODE: " } func (node NodeType) Prefix() string { return "NODE: " }
func (node NodeType) String() string { return string(node) } func (node NodeType) String() string { return string(node) }
// ExtType identifies an extension on a node
type ExtType string type ExtType string
func (ext ExtType) Prefix() string { return "EXTENSION: " } func (ext ExtType) Prefix() string { return "EXTENSION: " }
func (ext ExtType) String() string { return string(ext) } func (ext ExtType) String() string { return string(ext) }
@ -31,6 +34,7 @@ func (ext ExtType) String() string { return string(ext) }
//Function to load an extension from bytes //Function to load an extension from bytes
type ExtensionLoadFunc func(*Context, []byte) (Extension, error) type ExtensionLoadFunc func(*Context, []byte) (Extension, error)
// ExtType and NodeType constants
const ( const (
ACLExtType = ExtType("ACL") ACLExtType = ExtType("ACL")
ListenerExtType = ExtType("LISTENER") ListenerExtType = ExtType("LISTENER")
@ -42,16 +46,23 @@ const (
GQLNodeType = NodeType("GQL") GQLNodeType = NodeType("GQL")
) )
var (
NodeNotFoundError = errors.New("Node not found in DB")
)
// Information about a registered extension // Information about a registered extension
type ExtensionInfo struct { type ExtensionInfo struct {
// Function used to load extensions of this type from the database
Load ExtensionLoadFunc Load ExtensionLoadFunc
Type ExtType Type ExtType
// Extra context data shared between nodes of this class
Data interface{} Data interface{}
} }
// Information about a registered node type // Information about a registered node type
type NodeInfo struct { type NodeInfo struct {
Type NodeType Type NodeType
// Required extensions to be a valid node of this class
Extensions []ExtType Extensions []ExtType
} }
@ -119,8 +130,7 @@ func (ctx *Context) RegisterExtension(ext_type ExtType, load_fn ExtensionLoadFun
return nil return nil
} }
var NodeNotFoundError = errors.New("Node not found in DB") // Get a node from the context, or load from the database if not loaded
func (ctx *Context) GetNode(id NodeID) (*Node, error) { func (ctx *Context) GetNode(id NodeID) (*Node, error) {
target, exists := ctx.Nodes[id] target, exists := ctx.Nodes[id]
if exists == false { if exists == false {

@ -30,7 +30,6 @@ import (
"encoding/pem" "encoding/pem"
) )
type AuthReqJSON struct { type AuthReqJSON struct {
Time time.Time `json:"time"` Time time.Time `json:"time"`
Pubkey []byte `json:"pubkey"` Pubkey []byte `json:"pubkey"`

@ -5,11 +5,13 @@ import (
"fmt" "fmt"
) )
// A Listener extension provides a channel that can receive signals on a different thread
type ListenerExt struct { type ListenerExt struct {
Buffer int Buffer int
Chan chan Signal Chan chan Signal
} }
// Create a new listener extension with a given buffer size
func NewListenerExt(buffer int) *ListenerExt { func NewListenerExt(buffer int) *ListenerExt {
return &ListenerExt{ return &ListenerExt{
Buffer: buffer, Buffer: buffer,
@ -17,6 +19,7 @@ func NewListenerExt(buffer int) *ListenerExt {
} }
} }
// Simple load function, unmarshal the buffer int from json
func LoadListenerExt(ctx *Context, data []byte) (Extension, error) { func LoadListenerExt(ctx *Context, data []byte) (Extension, error) {
var j int var j int
err := json.Unmarshal(data, &j) err := json.Unmarshal(data, &j)
@ -31,6 +34,7 @@ func (listener *ListenerExt) Type() ExtType {
return ListenerExtType return ListenerExtType
} }
// Send the signal to the channel, logging an overflow if it occurs
func (ext *ListenerExt) Process(ctx *Context, princ_id NodeID, node *Node, signal Signal) { func (ext *ListenerExt) Process(ctx *Context, princ_id NodeID, node *Node, signal Signal) {
ctx.Log.Logf("signal", "LISTENER_PROCESS: %s - %+v", node.ID, signal) ctx.Log.Logf("signal", "LISTENER_PROCESS: %s - %+v", node.ID, signal)
select { select {
@ -41,16 +45,51 @@ func (ext *ListenerExt) Process(ctx *Context, princ_id NodeID, node *Node, signa
return return
} }
// ReqState holds the multiple states of a requirement
type ReqState struct {
Link string `json:"link"`
Lock string `json:"lock"`
}
// A LockableExt allows a node to be linked to other nodes(via LinkSignal) and locked/unlocked(via LockSignal)
type LockableExt struct {
Owner *NodeID
PendingOwner *NodeID
Requirements map[NodeID]ReqState
Dependencies map[NodeID]string
}
type LockableExtJSON struct {
Owner *NodeID `json:"owner"`
PendingOwner *NodeID `json:"pending_owner"`
Requirements map[string]ReqState `json:"requirements"`
Dependencies map[string]string `json:"dependencies"`
}
// Simple json load function: TODO: make these a generic function as before
func LoadLockableExt(ctx *Context, data []byte) (Extension, error) { func LoadLockableExt(ctx *Context, data []byte) (Extension, error) {
var ext LockableExt var j LockableExtJSON
err := json.Unmarshal(data, &ext) err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
requirements, err := LoadIDMap(j.Requirements)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx.Log.Logf("db", "DB_LOADING_LOCKABLE_EXT_JSON: %+v", ext) dependencies, err := LoadIDMap(j.Dependencies)
if err != nil {
return nil, err
}
return &ext, nil return &LockableExt{
Owner: j.Owner,
PendingOwner: j.PendingOwner,
Requirements: requirements,
Dependencies: dependencies,
}, nil
} }
func (ext *ListenerExt) Serialize() ([]byte, error) { func (ext *ListenerExt) Serialize() ([]byte, error) {
@ -62,7 +101,12 @@ func (ext *LockableExt) Type() ExtType {
} }
func (ext *LockableExt) Serialize() ([]byte, error) { func (ext *LockableExt) Serialize() ([]byte, error) {
return json.MarshalIndent(ext, "", " ") return json.MarshalIndent(&LockableExtJSON{
Owner: ext.Owner,
PendingOwner: ext.PendingOwner,
Requirements: IDMap(ext.Requirements),
Dependencies: IDMap(ext.Dependencies),
}, "", " ")
} }
func NewLockableExt() *LockableExt { func NewLockableExt() *LockableExt {
@ -74,26 +118,17 @@ func NewLockableExt() *LockableExt {
} }
} }
type ReqState struct { // Send the signal to unlock a node from itself
Link string `json:"link"`
Lock string `json:"lock"`
}
type LockableExt struct {
Owner *NodeID `json:"owner"`
PendingOwner *NodeID `json:"pending_owner"`
Requirements map[NodeID]ReqState `json:"requirements"`
Dependencies map[NodeID]string `json:"dependencies"`
}
func UnlockLockable(ctx *Context, node *Node) error { func UnlockLockable(ctx *Context, node *Node) error {
return ctx.Send(node.ID, node.ID, NewLockSignal("unlock")) return ctx.Send(node.ID, node.ID, NewLockSignal("unlock"))
} }
// Send the signal to lock a node from itself
func LockLockable(ctx *Context, node *Node) error { func LockLockable(ctx *Context, node *Node) error {
return ctx.Send(node.ID, node.ID, NewLockSignal("lock")) return ctx.Send(node.ID, node.ID, NewLockSignal("lock"))
} }
// Setup a node to send the initial requirement link signal, then send the signal
func LinkRequirement(ctx *Context, dependency *Node, requirement NodeID) error { func LinkRequirement(ctx *Context, dependency *Node, requirement NodeID) error {
dep_ext, err := GetExt[*LockableExt](dependency) dep_ext, err := GetExt[*LockableExt](dependency)
if err != nil { if err != nil {
@ -114,6 +149,7 @@ func LinkRequirement(ctx *Context, dependency *Node, requirement NodeID) error {
return ctx.Send(dependency.ID, requirement, NewLinkSignal("link_as_req")) return ctx.Send(dependency.ID, requirement, NewLinkSignal("link_as_req"))
} }
// Handle a LockSignal and update the extensions owner/requirement states
func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) { func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal) ctx.Log.Logf("lockable", "LOCK_SIGNAL: %s->%s %+v", source, node.ID, signal)
state := signal.State state := signal.State
@ -269,8 +305,8 @@ func (ext *LockableExt) HandleLockSignal(ctx *Context, source NodeID, node *Node
} }
} }
// TODO: don't allow changes to requirements or dependencies while being locked or locked // Handle LinkSignal, updating the extensions requirements and dependencies as necessary
// TODO: add unlink // TODO: Add unlink
func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) { func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node, signal StateSignal) {
ctx.Log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal) ctx.Log.Logf("lockable", "LINK_SIGNAL: %s->%s %+v", source, node.ID, signal)
state := signal.State state := signal.State
@ -339,6 +375,8 @@ func (ext *LockableExt) HandleLinkSignal(ctx *Context, source NodeID, node *Node
} }
} }
// LockableExts process Up/Down signals by forwarding them to owner, dependency, and requirement nodes
// LockSignal and LinkSignal Direct signals are processed to update the requirement/dependency/lock state
func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal Signal) { func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal Signal) {
ctx.Log.Logf("signal", "LOCKABLE_PROCESS: %s", node.ID) ctx.Log.Logf("signal", "LOCKABLE_PROCESS: %s", node.ID)
@ -389,88 +427,3 @@ func (ext *LockableExt) Process(ctx *Context, source NodeID, node *Node, signal
} }
} }
func SaveNode(node *Node) string {
str := ""
if node != nil {
str = node.ID.String()
}
return str
}
func RestoreNode(ctx *Context, id_str string) (*Node, error) {
if id_str == "" {
return nil, nil
}
id, err := ParseID(id_str)
if err != nil {
return nil, err
}
return LoadNode(ctx, id)
}
func SaveNodeMap(nodes NodeMap) map[string]string {
m := map[string]string{}
for id, node := range(nodes) {
m[id.String()] = SaveNode(node)
}
return m
}
func RestoreNodeMap(ctx *Context, ids map[string]string) (NodeMap, error) {
nodes := NodeMap{}
for id_str_1, id_str_2 := range(ids) {
id_1, err := ParseID(id_str_1)
if err != nil {
return nil, err
}
node_1, err := LoadNode(ctx, id_1)
if err != nil {
return nil, err
}
var node_2 *Node = nil
if id_str_2 != "" {
id_2, err := ParseID(id_str_2)
if err != nil {
return nil, err
}
node_2, err = LoadNode(ctx, id_2)
if err != nil {
return nil, err
}
}
nodes[node_1.ID] = node_2
}
return nodes, nil
}
func SaveNodeList(nodes NodeMap) []string {
ids := make([]string, len(nodes))
i := 0
for id, _ := range(nodes) {
ids[i] = id.String()
i += 1
}
return ids
}
func RestoreNodeList(ctx *Context, ids []string) (NodeMap, error) {
nodes := NodeMap{}
for _, id_str := range(ids) {
node, err := RestoreNode(ctx, id_str)
if err != nil {
return nil, err
}
nodes[node.ID] = node
}
return nodes, nil
}

@ -299,7 +299,10 @@ func NewNode(ctx *Context, id NodeID, node_type NodeType, queued_signals []Queue
NextSignal: next_signal, NextSignal: next_signal,
} }
ctx.Nodes[id] = node ctx.Nodes[id] = node
WriteNode(ctx, node) err := WriteNode(ctx, node)
if err != nil {
panic(err)
}
go runNode(ctx, node) go runNode(ctx, node)
@ -451,6 +454,7 @@ func WriteNode(ctx *Context, node *Node) error {
} }
id_bytes := node.ID.Serialize() id_bytes := node.ID.Serialize()
ctx.Log.Logf("db", "DB_WRITE_ID: %+v", id_bytes)
return ctx.DB.Update(func(txn *badger.Txn) error { return ctx.DB.Update(func(txn *badger.Txn) error {
return txn.Set(id_bytes, bytes) return txn.Set(id_bytes, bytes)
@ -461,7 +465,9 @@ func LoadNode(ctx * Context, id NodeID) (*Node, error) {
ctx.Log.Logf("db", "LOADING_NODE: %s", id) ctx.Log.Logf("db", "LOADING_NODE: %s", id)
var bytes []byte var bytes []byte
err := ctx.DB.View(func(txn *badger.Txn) error { err := ctx.DB.View(func(txn *badger.Txn) error {
item, err := txn.Get(id.Serialize()) id_bytes := id.Serialize()
ctx.Log.Logf("db", "DB_READ_ID: %+v", id_bytes)
item, err := txn.Get(id_bytes)
if err != nil { if err != nil {
return err return err
} }
@ -683,3 +689,23 @@ func del[K comparable](list []K, val K) []K {
list[idx] = list[len(list)-1] list[idx] = list[len(list)-1]
return list[:len(list)-1] return list[:len(list)-1]
} }
func IDMap[S any, T map[NodeID]S](m T)map[string]S {
ret := map[string]S{}
for id, val := range(m) {
ret[id.String()] = val
}
return ret
}
func LoadIDMap[S any, T map[string]S](m T)(map[NodeID]S, error) {
ret := map[NodeID]S{}
for str, val := range(m) {
id, err := ParseID(str)
if err != nil {
return nil, err
}
ret[id] = val
}
return ret, nil
}

@ -5,7 +5,7 @@ import (
) )
func TestNodeDB(t *testing.T) { func TestNodeDB(t *testing.T) {
ctx := logTestContext(t, []string{}) ctx := logTestContext(t, []string{"db"})
node_type := NodeType("test") node_type := NodeType("test")
err := ctx.RegisterNodeType(node_type, []ExtType{GroupExtType}) err := ctx.RegisterNodeType(node_type, []ExtType{GroupExtType})
fatalErr(t, err) fatalErr(t, err)
@ -13,6 +13,6 @@ func TestNodeDB(t *testing.T) {
node := NewNode(ctx, RandID(), node_type, nil, NewGroupExt(nil)) node := NewNode(ctx, RandID(), node_type, nil, NewGroupExt(nil))
ctx.Nodes = NodeMap{} ctx.Nodes = NodeMap{}
_, err = LoadNode(ctx, node.ID) _, err = ctx.GetNode(node.ID)
fatalErr(t, err) fatalErr(t, err)
} }

@ -71,7 +71,11 @@ func LoadECDHExt(ctx *Context, data []byte) (Extension, error) {
} }
type GroupExt struct { type GroupExt struct {
Members NodeMap Members map[NodeID]string
}
type GroupExtJSON struct {
Members map[string]string `json:"members"`
} }
func (ext *GroupExt) Type() ExtType { func (ext *GroupExt) Type() ExtType {
@ -79,40 +83,36 @@ func (ext *GroupExt) Type() ExtType {
} }
func (ext *GroupExt) Serialize() ([]byte, error) { func (ext *GroupExt) Serialize() ([]byte, error) {
return json.MarshalIndent(&struct{ return json.MarshalIndent(&GroupExtJSON{
Members []string `json:"members"` Members: IDMap(ext.Members),
}{
Members: SaveNodeList(ext.Members),
}, "", " ") }, "", " ")
} }
func NewGroupExt(members NodeMap) *GroupExt { func NewGroupExt(members map[NodeID]string) *GroupExt {
if members == nil { if members == nil {
members = NodeMap{} members = map[NodeID]string{}
} }
return &GroupExt{ return &GroupExt{
Members: members, Members: members,
} }
} }
func LoadGroupExt(ctx *Context, data []byte) (Extension, error) { func LoadGroupExt(ctx *Context, data []byte) (Extension, error) {
var j struct { var j GroupExtJSON
Members []string `json:"members"`
}
err := json.Unmarshal(data, &j) err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
members, err := RestoreNodeList(ctx, j.Members) members, err := LoadIDMap(j.Members)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewGroupExt(members), nil return &GroupExt{
Members: members,
}, nil
} }
func (ext *GroupExt) Process(ctx *Context, princ_id NodeID, node *Node, signal Signal) { func (ext *GroupExt) Process(ctx *Context, princ_id NodeID, node *Node, signal Signal) {
return return
} }