Started conversion to sql

noah metz 2023-12-08 14:20:35 -07:00
parent 61565fa18c
commit e06a3be580
5 changed files with 199 additions and 68 deletions

@ -10,7 +10,8 @@ import (
"time"
"github.com/google/uuid"
badger "github.com/dgraph-io/badger/v3"
_ "modernc.org/sqlite"
"database/sql"
)
var (
@ -21,6 +22,7 @@ var (
type ExtensionInfo struct {
Type reflect.Type
Data interface{}
FieldMap map[string][]int
}
type NodeInfo struct {
@ -49,7 +51,7 @@ type KindInfo struct {
// A Context stores all the data to run a graphvent process
type Context struct {
// DB is the database connection used to load and write nodes
DB * badger.DB
DB * sql.DB
// Logging interface
Log Logger
// Map between database extension hashes and the registered info
@ -149,6 +151,17 @@ func (ctx *Context)RegisterSignal(reflect_type reflect.Type, signal_type SignalT
return nil
}
func GetExtFieldMap(ctx *Context, reflect_type reflect.Type) map[string][]int {
field_map := map[string][]int{}
for _, field := range(reflect.VisibleFields(reflect_type)) {
gv_tag, has_tag := field.Tag.Lookup("gv")
if has_tag {
field_map[gv_tag] = field.Index
}
}
return field_map
}
func (ctx *Context)RegisterExtension(reflect_type reflect.Type, ext_type ExtType, data interface{}) error {
_, exists := ctx.Extensions[ext_type]
if exists == true {
@ -161,6 +174,11 @@ func (ctx *Context)RegisterExtension(reflect_type reflect.Type, ext_type ExtType
return err
}
field_map := GetExtFieldMap(ctx, reflect_type)
if err != nil {
return err
}
err = ctx.RegisterType(elem_type, SerializedType(ext_type), nil, SerializeStruct(elem_info), nil, DeserializeStruct(elem_info))
if err != nil {
return err
@ -171,6 +189,7 @@ func (ctx *Context)RegisterExtension(reflect_type reflect.Type, ext_type ExtType
ctx.Extensions[ext_type] = ExtensionInfo{
Type: reflect_type,
Data: data,
FieldMap: field_map,
}
ctx.ExtensionTypes[reflect_type] = ext_type
@ -341,7 +360,7 @@ func (ctx *Context) Send(messages Messages) error {
}
// Create a new Context with the base library content added
func NewContext(db * badger.DB, log Logger) (*Context, error) {
func NewContext(db * sql.DB, log Logger) (*Context, error) {
ctx := &Context{
DB: db,
Log: log,

@ -8,7 +8,7 @@ require (
github.com/google/uuid v1.3.0
github.com/graphql-go/graphql v0.8.1
github.com/rs/zerolog v1.29.1
golang.org/x/net v0.7.0
golang.org/x/net v0.16.0
)
require (
@ -16,7 +16,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
@ -25,12 +25,26 @@ require (
github.com/golang/protobuf v1.3.1 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.12.3 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/stretchr/testify v1.8.2 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
modernc.org/cc/v3 v3.40.0 // indirect
modernc.org/ccgo/v3 v3.16.13 // indirect
modernc.org/libc v1.29.0 // indirect
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.7.2 // indirect
modernc.org/opt v0.1.3 // indirect
modernc.org/sqlite v1.27.0 // indirect
modernc.org/strutil v1.1.3 // indirect
modernc.org/token v1.0.1 // indirect
)

@ -27,6 +27,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
@ -57,6 +59,8 @@ github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuM
github.com/graphql-go/graphql v0.8.1/go.mod h1:nKiHzRM0qopJEwCITUuIsxk9PlVlwIiiI8pnJEhordQ=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=
@ -69,6 +73,8 @@ github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZb
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@ -76,6 +82,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
@ -116,6 +124,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -126,6 +136,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -141,6 +152,7 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -155,6 +167,8 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -172,3 +186,23 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI=
lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw=
modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0=
modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw=
modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY=
modernc.org/libc v1.29.0 h1:tTFRFq69YKCF2QyGNuRUQxKBm1uZZLubf6Cjh/pVHXs=
modernc.org/libc v1.29.0/go.mod h1:DaG/4Q3LRRdqpiLyP0C2m1B8ZMGkQ+cCgOIjEtQlYhQ=
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E=
modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sqlite v1.27.0 h1:MpKAHoyYB7xqcwnUwkuD+npwEa0fojF0B5QRbN+auJ8=
modernc.org/sqlite v1.27.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0=
modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg=
modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

@ -19,6 +19,47 @@ type Logger interface {
Logj(component string, s interface{}, format string, items ... interface{})
}
type DebugConsoleLogger struct {
logger zerolog.Logger
}
func NewDebugConsoleLogger() *DebugConsoleLogger {
return &DebugConsoleLogger{
logger: zerolog.New(os.Stdout).With().Timestamp().Logger(),
}
}
func (logger * DebugConsoleLogger) SetComponents(components []string) error {
return nil
}
func (logger * DebugConsoleLogger) Logf(component string, format string, items ... interface{}) {
log := logger.logger.Log()
log.Str("component", component).Msg(fmt.Sprintf(format, items...))
}
func (logger * DebugConsoleLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) {
log := logger.logger.Log().Str("component", component)
for key, value := range(fields) {
log = log.Str(key, fmt.Sprintf("%+v", value))
}
log.Msg(fmt.Sprintf(format, items...))
}
func (logger * DebugConsoleLogger) Logj(component string, s interface{}, format string, items ... interface{}) {
m := map[string]interface{}{}
ser, err := json.Marshal(s)
if err != nil {
panic("LOG_MARSHAL_ERR")
}
err = json.Unmarshal(ser, &m)
if err != nil {
panic("LOG_UNMARSHAL_ERR")
}
logger.Logm(component, m, format, items...)
}
func NewConsoleLogger(components []string) *ConsoleLogger {
logger := &ConsoleLogger{
loggers: map[string]zerolog.Logger{},
@ -50,7 +91,7 @@ func (logger * ConsoleLogger) SetComponents(components []string) error {
return false
}
for c, _ := range(logger.loggers) {
for c := range(logger.loggers) {
if component_enabled(c) == false {
delete(logger.loggers, c)
}

@ -6,11 +6,14 @@ import (
"crypto/sha512"
"encoding/binary"
"fmt"
"strings"
"reflect"
"sync/atomic"
"time"
"context"
badger "github.com/dgraph-io/badger/v3"
_ "modernc.org/sqlite"
"database/sql"
"github.com/google/uuid"
)
@ -94,28 +97,31 @@ type PendingACLSignal struct {
// Default message channel size for nodes
// Nodes represent a group of extensions that can be collectively addressed
type Node struct {
// Set at creation time, cannot be changed
Key ed25519.PrivateKey `gv:"key"`
ID NodeID
Type NodeType `gv:"type"`
// TODO: move each extension to it's own db key, and extend changes to notify which extension was changed
Extensions map[ExtType]Extension
Policies []Policy `gv:"policies"`
// Keys set at time of creation, cannot be changed
Extensions map[ExtType]Extension
// Not serialized
PendingACLs map[uuid.UUID]PendingACL `gv:"pending_acls"`
PendingACLSignals map[uuid.UUID]PendingACLSignal `gv:"pending_signal"`
// Channel for this node to receive messages from the Context
MsgChan chan *Message
// Size of MsgChan
// Size of MsgChan, serialized
BufferSize uint32 `gv:"buffer_size"`
// Channel for this node to process delayed signals
TimeoutChan <-chan time.Time
Active atomic.Bool
// TODO: enhance WriteNode to write SignalQueue to a different key, and use writeSignalQueue to decide whether or not to update it
writeSignalQueue bool
// Written to a different table
AddedQueuedSignals []QueuedSignal
RemovedQueuedSignals []uuid.UUID
SignalQueue []QueuedSignal
NextSignal *QueuedSignal
}
@ -203,29 +209,11 @@ func (node *Node) QueueTimeout(reason WaitReason, dest NodeID, signal Signal, ti
}
func (node *Node) QueueSignal(time time.Time, signal Signal) {
node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time})
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
node.writeSignalQueue = true
node.AddedQueuedSignals = append(node.AddedQueuedSignals, QueuedSignal{signal, time})
}
func (node *Node) DequeueSignal(id uuid.UUID) error {
idx := -1
for i, q := range(node.SignalQueue) {
if q.Signal.ID() == id {
idx = i
break
}
}
if idx == -1 {
return fmt.Errorf("%s is not in SignalQueue", id)
}
node.SignalQueue[idx] = node.SignalQueue[len(node.SignalQueue)-1]
node.SignalQueue = node.SignalQueue[:len(node.SignalQueue)-1]
node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue)
node.writeSignalQueue = true
return nil
func (node *Node) DequeueSignal(id uuid.UUID) {
node.RemovedQueuedSignals = append(node.RemovedQueuedSignals, id)
}
func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) {
@ -735,58 +723,93 @@ func WriteNodeExtList(ctx *Context, node *Node) error {
})
}
func WriteNodeInitial(ctx *Context, node *Node) error {
ctx.Log.Logf("db", "Writing node initial data %s - %+v", node.ID, node)
return nil
}
func GetExtField(ctx *Context, node *Node, ext_type ExtType, field string) (interface{}, error) {
ext_info, exists := ctx.Extensions[ext_type]
if exists == false {
return nil, fmt.Errorf("0x%x is not a know extension type", ext_type)
}
ext, has_ext := node.Extensions[ext_type]
if has_ext == false {
return nil, fmt.Errorf("0x%x is not an extension of %s", ext_type, node.ID)
}
field_idx, has_field := ext_info.FieldMap[field]
if has_field == false {
return nil, fmt.Errorf("%s is not a field in %+v", field, ext_info.Type)
}
return reflect.ValueOf(ext).FieldByIndex(field_idx).Interface(), nil
}
func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error {
ctx.Log.Logf("db", "Writing changes for %s - %+v", node.ID, changes)
ext_serialized := map[ExtType]SerializedValue{}
for ext_type := range(changes) {
ext, ext_exists := node.Extensions[ext_type]
if ext_exists == false {
ctx.Log.Logf("db", "extension 0x%x does not exist for %s", ext_type, node.ID)
} else {
serialized_ext, err := SerializeAny(ctx, ext)
tx, err := ctx.DB.BeginTx(context.TODO(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return err
}
ext_serialized[ext_type] = serialized_ext
}
}
var sq_serialized *SerializedValue = nil
if node.writeSignalQueue == true {
node.writeSignalQueue = false
ser, err := SerializeAny(ctx, node.SignalQueue)
// Remove removed signals from DB, and from in memory list
new_signal_queue := []QueuedSignal{}
for _, signal_id := range(node.RemovedQueuedSignals) {
_, err = tx.Exec("DELETE FROM queued_signals WHERE node = ? AND signal_ID = ?", node.ID, signal_id)
if err != nil {
return err
}
sq_serialized = &ser
idx := -1
for i, signal := range(node.SignalQueue) {
}
}
node_serialized, err := SerializeAny(ctx, node)
node.RemovedQueuedSignals = nil
for _, signal := range(node.AddedQueuedSignals) {
signal_serialized, err := SerializeAny(ctx, signal.Signal)
if err != nil {
return err
}
id_bytes, err := node.ID.MarshalBinary()
return ctx.DB.Update(func(txn *badger.Txn) error {
err := txn.Set(id_bytes, node_serialized.Data)
_, err = tx.Exec("UPDATE queued_signals ADD node = ?, signal_id = ?, signal_data = ?, time = ?", node.ID, signal.ID(), signal_serialized.Data, signal.Time)
if err != nil {
return err
}
if sq_serialized != nil {
err := txn.Set(append(id_bytes, signal_queue_suffix...), sq_serialized.Data)
}
node.SignalQueue = append(node.SignalQueue, node.AddedQueuedSignals...)
node.AddedQueuedSignals = nil
for ext_type, changes := range(changes) {
change_strings := make([]string, len(changes))
change_values := make([]interface{}, len(changes))
for i, changed_value := range(changes) {
change_strings[i] = fmt.Sprintf("%s = ?", changed_value)
var err error
change_values[i], err = GetExtField(ctx, node, ext_type, changed_value)
if err != nil {
return err
}
}
for ext_type, data := range(ext_serialized) {
err := txn.Set(append(id_bytes, ExtTypeSuffix(ext_type)...), data.Data)
set_string := strings.Join(change_strings, ", ")
_, err = tx.Exec(fmt.Sprintf("UPDATE extension_%x SET %s WHERE node = ?", ext_type, set_string), append(change_values, node.ID))
if err != nil {
return err
}
}
return nil
})
return tx.Commit()
}
func LoadNode(ctx *Context, id NodeID) (*Node, error) {