diff --git a/context.go b/context.go index 91d0118..d894a04 100644 --- a/context.go +++ b/context.go @@ -10,7 +10,8 @@ import ( "time" "github.com/google/uuid" - badger "github.com/dgraph-io/badger/v3" + _ "modernc.org/sqlite" + "database/sql" ) var ( @@ -21,6 +22,7 @@ var ( type ExtensionInfo struct { Type reflect.Type Data interface{} + FieldMap map[string][]int } type NodeInfo struct { @@ -49,7 +51,7 @@ type KindInfo struct { // A Context stores all the data to run a graphvent process type Context struct { // DB is the database connection used to load and write nodes - DB * badger.DB + DB * sql.DB // Logging interface Log Logger // Map between database extension hashes and the registered info @@ -149,6 +151,17 @@ func (ctx *Context)RegisterSignal(reflect_type reflect.Type, signal_type SignalT return nil } +func GetExtFieldMap(ctx *Context, reflect_type reflect.Type) map[string][]int { + field_map := map[string][]int{} + for _, field := range(reflect.VisibleFields(reflect_type)) { + gv_tag, has_tag := field.Tag.Lookup("gv") + if has_tag { + field_map[gv_tag] = field.Index + } + } + return field_map +} + func (ctx *Context)RegisterExtension(reflect_type reflect.Type, ext_type ExtType, data interface{}) error { _, exists := ctx.Extensions[ext_type] if exists == true { @@ -161,6 +174,11 @@ func (ctx *Context)RegisterExtension(reflect_type reflect.Type, ext_type ExtType return err } + field_map := GetExtFieldMap(ctx, reflect_type) + if err != nil { + return err + } + err = ctx.RegisterType(elem_type, SerializedType(ext_type), nil, SerializeStruct(elem_info), nil, DeserializeStruct(elem_info)) if err != nil { return err @@ -171,6 +189,7 @@ func (ctx *Context)RegisterExtension(reflect_type reflect.Type, ext_type ExtType ctx.Extensions[ext_type] = ExtensionInfo{ Type: reflect_type, Data: data, + FieldMap: field_map, } ctx.ExtensionTypes[reflect_type] = ext_type @@ -341,7 +360,7 @@ func (ctx *Context) Send(messages Messages) error { } // Create a new Context with the base library content added -func NewContext(db * badger.DB, log Logger) (*Context, error) { +func NewContext(db * sql.DB, log Logger) (*Context, error) { ctx := &Context{ DB: db, Log: log, diff --git a/go.mod b/go.mod index 65ed97f..e74539f 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/google/uuid v1.3.0 github.com/graphql-go/graphql v0.8.1 github.com/rs/zerolog v1.29.1 - golang.org/x/net v0.7.0 + golang.org/x/net v0.16.0 ) require ( @@ -16,7 +16,7 @@ require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -25,12 +25,26 @@ require ( github.com/golang/protobuf v1.3.1 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/google/flatbuffers v1.12.1 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/compress v1.12.3 // indirect github.com/mattn/go-colorable v0.1.12 // indirect - github.com/mattn/go-isatty v0.0.14 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/stretchr/testify v1.8.2 // indirect go.opencensus.io v0.22.5 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect + golang.org/x/mod v0.13.0 // indirect golang.org/x/sys v0.13.0 // indirect + golang.org/x/tools v0.14.0 // indirect + lukechampine.com/uint128 v1.2.0 // indirect + modernc.org/cc/v3 v3.40.0 // indirect + modernc.org/ccgo/v3 v3.16.13 // indirect + modernc.org/libc v1.29.0 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/opt v0.1.3 // indirect + modernc.org/sqlite v1.27.0 // indirect + modernc.org/strutil v1.1.3 // indirect + modernc.org/token v1.0.1 // indirect ) diff --git a/go.sum b/go.sum index 623b3f1..de14fb8 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= @@ -57,6 +59,8 @@ github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuM github.com/graphql-go/graphql v0.8.1/go.mod h1:nKiHzRM0qopJEwCITUuIsxk9PlVlwIiiI8pnJEhordQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= @@ -69,6 +73,8 @@ github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZb github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -76,6 +82,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= @@ -116,6 +124,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= +golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -126,6 +136,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -141,6 +152,7 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -155,6 +167,8 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= +golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -172,3 +186,23 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= +lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= +modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw= +modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0= +modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw= +modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY= +modernc.org/libc v1.29.0 h1:tTFRFq69YKCF2QyGNuRUQxKBm1uZZLubf6Cjh/pVHXs= +modernc.org/libc v1.29.0/go.mod h1:DaG/4Q3LRRdqpiLyP0C2m1B8ZMGkQ+cCgOIjEtQlYhQ= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sqlite v1.27.0 h1:MpKAHoyYB7xqcwnUwkuD+npwEa0fojF0B5QRbN+auJ8= +modernc.org/sqlite v1.27.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0= +modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= +modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= +modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg= +modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/log.go b/log.go index 034a760..1497ba8 100644 --- a/log.go +++ b/log.go @@ -19,6 +19,47 @@ type Logger interface { Logj(component string, s interface{}, format string, items ... interface{}) } +type DebugConsoleLogger struct { + logger zerolog.Logger +} + +func NewDebugConsoleLogger() *DebugConsoleLogger { + return &DebugConsoleLogger{ + logger: zerolog.New(os.Stdout).With().Timestamp().Logger(), + } +} + +func (logger * DebugConsoleLogger) SetComponents(components []string) error { + return nil +} + +func (logger * DebugConsoleLogger) Logf(component string, format string, items ... interface{}) { + log := logger.logger.Log() + log.Str("component", component).Msg(fmt.Sprintf(format, items...)) +} + +func (logger * DebugConsoleLogger) Logm(component string, fields map[string]interface{}, format string, items ... interface{}) { + log := logger.logger.Log().Str("component", component) + for key, value := range(fields) { + log = log.Str(key, fmt.Sprintf("%+v", value)) + } + + log.Msg(fmt.Sprintf(format, items...)) +} + +func (logger * DebugConsoleLogger) Logj(component string, s interface{}, format string, items ... interface{}) { + m := map[string]interface{}{} + ser, err := json.Marshal(s) + if err != nil { + panic("LOG_MARSHAL_ERR") + } + err = json.Unmarshal(ser, &m) + if err != nil { + panic("LOG_UNMARSHAL_ERR") + } + logger.Logm(component, m, format, items...) +} + func NewConsoleLogger(components []string) *ConsoleLogger { logger := &ConsoleLogger{ loggers: map[string]zerolog.Logger{}, @@ -50,7 +91,7 @@ func (logger * ConsoleLogger) SetComponents(components []string) error { return false } - for c, _ := range(logger.loggers) { + for c := range(logger.loggers) { if component_enabled(c) == false { delete(logger.loggers, c) } diff --git a/node.go b/node.go index 40f558b..d490943 100644 --- a/node.go +++ b/node.go @@ -6,11 +6,14 @@ import ( "crypto/sha512" "encoding/binary" "fmt" + "strings" "reflect" "sync/atomic" "time" + "context" - badger "github.com/dgraph-io/badger/v3" + _ "modernc.org/sqlite" + "database/sql" "github.com/google/uuid" ) @@ -94,28 +97,31 @@ type PendingACLSignal struct { // Default message channel size for nodes // Nodes represent a group of extensions that can be collectively addressed type Node struct { + // Set at creation time, cannot be changed Key ed25519.PrivateKey `gv:"key"` ID NodeID Type NodeType `gv:"type"` - // TODO: move each extension to it's own db key, and extend changes to notify which extension was changed - Extensions map[ExtType]Extension - Policies []Policy `gv:"policies"` + // Keys set at time of creation, cannot be changed + Extensions map[ExtType]Extension + + // Not serialized PendingACLs map[uuid.UUID]PendingACL `gv:"pending_acls"` PendingACLSignals map[uuid.UUID]PendingACLSignal `gv:"pending_signal"` // Channel for this node to receive messages from the Context MsgChan chan *Message - // Size of MsgChan + // Size of MsgChan, serialized BufferSize uint32 `gv:"buffer_size"` // Channel for this node to process delayed signals TimeoutChan <-chan time.Time Active atomic.Bool - // TODO: enhance WriteNode to write SignalQueue to a different key, and use writeSignalQueue to decide whether or not to update it - writeSignalQueue bool + // Written to a different table + AddedQueuedSignals []QueuedSignal + RemovedQueuedSignals []uuid.UUID SignalQueue []QueuedSignal NextSignal *QueuedSignal } @@ -203,29 +209,11 @@ func (node *Node) QueueTimeout(reason WaitReason, dest NodeID, signal Signal, ti } func (node *Node) QueueSignal(time time.Time, signal Signal) { - node.SignalQueue = append(node.SignalQueue, QueuedSignal{signal, time}) - node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) - node.writeSignalQueue = true + node.AddedQueuedSignals = append(node.AddedQueuedSignals, QueuedSignal{signal, time}) } -func (node *Node) DequeueSignal(id uuid.UUID) error { - idx := -1 - for i, q := range(node.SignalQueue) { - if q.Signal.ID() == id { - idx = i - break - } - } - if idx == -1 { - return fmt.Errorf("%s is not in SignalQueue", id) - } - - node.SignalQueue[idx] = node.SignalQueue[len(node.SignalQueue)-1] - node.SignalQueue = node.SignalQueue[:len(node.SignalQueue)-1] - node.NextSignal, node.TimeoutChan = SoonestSignal(node.SignalQueue) - node.writeSignalQueue = true - - return nil +func (node *Node) DequeueSignal(id uuid.UUID) { + node.RemovedQueuedSignals = append(node.RemovedQueuedSignals, id) } func SoonestSignal(signals []QueuedSignal) (*QueuedSignal, <-chan time.Time) { @@ -735,58 +723,93 @@ func WriteNodeExtList(ctx *Context, node *Node) error { }) } +func WriteNodeInitial(ctx *Context, node *Node) error { + ctx.Log.Logf("db", "Writing node initial data %s - %+v", node.ID, node) + return nil +} + +func GetExtField(ctx *Context, node *Node, ext_type ExtType, field string) (interface{}, error) { + ext_info, exists := ctx.Extensions[ext_type] + if exists == false { + return nil, fmt.Errorf("0x%x is not a know extension type", ext_type) + } + + ext, has_ext := node.Extensions[ext_type] + if has_ext == false { + return nil, fmt.Errorf("0x%x is not an extension of %s", ext_type, node.ID) + } + + field_idx, has_field := ext_info.FieldMap[field] + if has_field == false { + return nil, fmt.Errorf("%s is not a field in %+v", field, ext_info.Type) + } + + return reflect.ValueOf(ext).FieldByIndex(field_idx).Interface(), nil +} + func WriteNodeChanges(ctx *Context, node *Node, changes Changes) error { ctx.Log.Logf("db", "Writing changes for %s - %+v", node.ID, changes) - ext_serialized := map[ExtType]SerializedValue{} - for ext_type := range(changes) { - ext, ext_exists := node.Extensions[ext_type] - if ext_exists == false { - ctx.Log.Logf("db", "extension 0x%x does not exist for %s", ext_type, node.ID) - } else { - serialized_ext, err := SerializeAny(ctx, ext) - if err != nil { - return err - } - ext_serialized[ext_type] = serialized_ext - } + tx, err := ctx.DB.BeginTx(context.TODO(), &sql.TxOptions{ + Isolation: sql.LevelSerializable, + }) + + if err != nil { + return err } - var sq_serialized *SerializedValue = nil - if node.writeSignalQueue == true { - node.writeSignalQueue = false - ser, err := SerializeAny(ctx, node.SignalQueue) + // Remove removed signals from DB, and from in memory list + new_signal_queue := []QueuedSignal{} + for _, signal_id := range(node.RemovedQueuedSignals) { + _, err = tx.Exec("DELETE FROM queued_signals WHERE node = ? AND signal_ID = ?", node.ID, signal_id) if err != nil { return err } - sq_serialized = &ser - } - node_serialized, err := SerializeAny(ctx, node) - if err != nil { - return err + idx := -1 + for i, signal := range(node.SignalQueue) { + + } } - id_bytes, err := node.ID.MarshalBinary() - return ctx.DB.Update(func(txn *badger.Txn) error { - err := txn.Set(id_bytes, node_serialized.Data) + node.RemovedQueuedSignals = nil + + for _, signal := range(node.AddedQueuedSignals) { + signal_serialized, err := SerializeAny(ctx, signal.Signal) if err != nil { return err } - if sq_serialized != nil { - err := txn.Set(append(id_bytes, signal_queue_suffix...), sq_serialized.Data) - if err != nil { - return err - } + + _, err = tx.Exec("UPDATE queued_signals ADD node = ?, signal_id = ?, signal_data = ?, time = ?", node.ID, signal.ID(), signal_serialized.Data, signal.Time) + if err != nil { + return err } - for ext_type, data := range(ext_serialized) { - err := txn.Set(append(id_bytes, ExtTypeSuffix(ext_type)...), data.Data) + } + + node.SignalQueue = append(node.SignalQueue, node.AddedQueuedSignals...) + node.AddedQueuedSignals = nil + + for ext_type, changes := range(changes) { + change_strings := make([]string, len(changes)) + change_values := make([]interface{}, len(changes)) + + for i, changed_value := range(changes) { + change_strings[i] = fmt.Sprintf("%s = ?", changed_value) + var err error + change_values[i], err = GetExtField(ctx, node, ext_type, changed_value) if err != nil { return err } } - return nil - }) + + set_string := strings.Join(change_strings, ", ") + _, err = tx.Exec(fmt.Sprintf("UPDATE extension_%x SET %s WHERE node = ?", ext_type, set_string), append(change_values, node.ID)) + if err != nil { + return err + } + } + + return tx.Commit() } func LoadNode(ctx *Context, id NodeID) (*Node, error) {