graphvent/gql.go

895 lines
26 KiB
Go

2023-06-25 20:20:59 -06:00
package graphvent
import (
"time"
"net"
2023-06-25 20:20:59 -06:00
"net/http"
"github.com/graphql-go/graphql"
"github.com/graphql-go/graphql/language/parser"
"github.com/graphql-go/graphql/language/source"
"github.com/graphql-go/graphql/language/ast"
"context"
"encoding/json"
"io"
"reflect"
"fmt"
"sync"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
2023-07-13 18:21:33 -06:00
"strings"
"crypto/ecdh"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/sha512"
"crypto/rand"
"crypto/x509"
"bytes"
"github.com/google/uuid"
2023-06-25 20:20:59 -06:00
)
type AuthReqJSON struct {
Time time.Time `json:"time"`
Pubkey []byte `json:"pubkey"`
ECDHPubkey []byte `json:"ecdh_client"`
Signature []byte `json:"signature"`
}
func NewAuthReqJSON(curve ecdh.Curve, id *ecdsa.PrivateKey) (AuthReqJSON, *ecdh.PrivateKey, error) {
ec_key, err := curve.GenerateKey(rand.Reader)
if err != nil {
return AuthReqJSON{}, nil, err
}
now := time.Now()
time_bytes, err := now.MarshalJSON()
if err != nil {
return AuthReqJSON{}, nil, err
}
sig_data := append(ec_key.PublicKey().Bytes(), time_bytes...)
sig_hash := sha512.Sum512(sig_data)
sig, err := ecdsa.SignASN1(rand.Reader, id, sig_hash[:])
id_ecdh, err := id.ECDH()
if err != nil {
return AuthReqJSON{}, nil, err
}
return AuthReqJSON{
Time: now,
Pubkey: id_ecdh.PublicKey().Bytes(),
ECDHPubkey: ec_key.PublicKey().Bytes(),
Signature: sig,
}, ec_key, nil
}
type AuthRespJSON struct {
Granted time.Time `json:"granted"`
ECDHPubkey []byte `json:"echd_server"`
2023-07-19 14:50:42 -06:00
Signature []byte `json:"signature"`
}
func NewAuthRespJSON(thread *GQLThread, req AuthReqJSON) (AuthRespJSON, *ecdsa.PublicKey, *ecdsa.PrivateKey, error) {
// Check if req.Time is within +- 1 second of now
now := time.Now()
earliest := now.Add(-1 * time.Second)
latest := now.Add(1 * time.Second)
// If req.Time is before the earliest acceptable time, or after the latest acceptible time
if req.Time.Compare(earliest) == -1 {
return AuthRespJSON{}, nil, nil, fmt.Errorf("GQL_AUTH_TIME_TOO_LATE: %s", req.Time)
} else if req.Time.Compare(latest) == 1 {
return AuthRespJSON{}, nil, nil, fmt.Errorf("GQL_AUTH_TIME_TOO_EARLY: %s", req.Time)
}
x, y := elliptic.Unmarshal(thread.Key.Curve, req.Pubkey)
if x == nil {
return AuthRespJSON{}, nil, nil, fmt.Errorf("GQL_AUTH_UNMARSHAL_FAIL: %+v", req.Pubkey)
}
remote, err := thread.ECDH.NewPublicKey(req.ECDHPubkey)
if err != nil {
return AuthRespJSON{}, nil, nil, err
}
// Verify the signature
time_bytes, _ := req.Time.MarshalJSON()
sig_data := append(req.ECDHPubkey, time_bytes...)
sig_hash := sha512.Sum512(sig_data)
remote_key := &ecdsa.PublicKey{
Curve: thread.Key.Curve,
X: x,
Y: y,
}
verified := ecdsa.VerifyASN1(
remote_key,
sig_hash[:],
req.Signature,
)
if verified == false {
return AuthRespJSON{}, nil, nil, fmt.Errorf("GQL_AUTH_VERIFY_FAIL: %+v", req)
}
ec_key, err := thread.ECDH.GenerateKey(rand.Reader)
if err != nil {
return AuthRespJSON{}, nil, nil, err
}
2023-07-19 14:50:42 -06:00
ec_key_pub := ec_key.PublicKey().Bytes()
granted := time.Now()
time_ser, _ := granted.MarshalJSON()
resp_sig_data := append(ec_key_pub, time_ser...)
resp_sig_hash := sha512.Sum512(resp_sig_data)
resp_sig, err := ecdsa.SignASN1(rand.Reader, thread.Key, resp_sig_hash[:])
if err != nil {
return AuthRespJSON{}, nil, nil, err
2023-07-19 14:50:42 -06:00
}
shared_secret, err := ec_key.ECDH(remote)
if err != nil {
return AuthRespJSON{}, nil, nil, err
}
secret_hash := sha512.Sum512(shared_secret)
buf := bytes.NewReader(secret_hash[:])
shared_key, err := ecdsa.GenerateKey(thread.Key.Curve, buf)
if err != nil {
return AuthRespJSON{}, nil, nil, err
}
return AuthRespJSON{
2023-07-19 14:50:42 -06:00
Granted: granted,
ECDHPubkey: ec_key_pub,
Signature: resp_sig,
}, remote_key, shared_key, nil
}
func ParseAuthRespJSON(resp AuthRespJSON, ecdsa_curve elliptic.Curve, ecdh_curve ecdh.Curve, ec_key *ecdh.PrivateKey) (*ecdsa.PrivateKey, error) {
remote, err := ecdh_curve.NewPublicKey(resp.ECDHPubkey)
if err != nil {
return nil, err
}
shared_secret, err := ec_key.ECDH(remote)
if err != nil {
return nil, err
}
secret_hash := sha512.Sum512(shared_secret)
buf := bytes.NewReader(secret_hash[:])
shared_key, err := ecdsa.GenerateKey(ecdsa_curve, buf)
if err != nil {
return nil, err
}
return shared_key, nil
}
type AuthData struct {
Granted time.Time
Pubkey *ecdsa.PublicKey
Shared *ecdsa.PrivateKey
}
func (data AuthData) String() string {
return fmt.Sprintf("{Granted: %+v, Pubkey: %s, Shared: %s}", data.Granted, KeyID(data.Pubkey).String(), KeyID(&data.Shared.PublicKey).String())
}
type AuthDataJSON struct {
Granted time.Time `json:"granted"`
Pubkey []byte `json:"pubkey"`
Shared []byte `json:"shared"`
}
func KeyID(pub *ecdsa.PublicKey) NodeID {
ser := elliptic.Marshal(pub.Curve, pub.X, pub.Y)
str := uuid.NewHash(sha512.New(), ZeroUUID, ser, 3)
return NodeID(str)
}
func AuthHandler(ctx *Context, server *GQLThread) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx.Log.Logf("gql", "GQL_AUTH_REQUEST: %s", r.RemoteAddr)
enableCORS(&w)
str, err := io.ReadAll(r.Body)
if err != nil {
ctx.Log.Logf("gql", "GQL_AUTH_READ_ERR: %e", err)
return
}
var req AuthReqJSON
err = json.Unmarshal([]byte(str), &req)
if err != nil {
ctx.Log.Logf("gql", "GQL_AUTH_UNMARHSHAL_ERR: %e", err)
return
}
resp, remote_id, shared_key, err := NewAuthRespJSON(server, req)
if err != nil {
ctx.Log.Logf("gql", "GQL_AUTH_VERIFY_ERROR: %s", err)
return
}
ser, err := json.Marshal(resp)
if err != nil {
ctx.Log.Logf("gql", "GQL_AUTH_RESP_MARSHAL_ERR: %e", err)
return
}
wrote, err := w.Write(ser)
if err != nil {
ctx.Log.Logf("gql", "GQL_AUTH_RESP_ERR: %e", err)
return
} else if wrote != len(ser) {
ctx.Log.Logf("gql", "GQL_AUTH_RESP_BAD_LENGTH: %d/%d", wrote, len(ser))
return
}
key_id := KeyID(remote_id)
new_auth := AuthData{
Granted: time.Now(),
Pubkey: remote_id,
Shared: shared_key,
}
_, exists := server.AuthMap[key_id]
if exists {
ctx.Log.Logf("gql", "REFRESHING AUTH FOR %s - %s", key_id, new_auth)
} else {
ctx.Log.Logf("gql", "AUTHORIZING NEW USER %s - %s", key_id, new_auth)
}
server.AuthMap[key_id] = new_auth
}
}
2023-06-25 20:20:59 -06:00
func GraphiQLHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r * http.Request) {
graphiql_string := fmt.Sprintf(`
<!--
* Copyright (c) 2021 GraphQL Contributors
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<title>GraphiQL</title>
<style>
body {
height: 100%%;
margin: 0;
width: 100%%;
overflow: hidden;
}
#graphiql {
height: 100vh;
}
</style>
<!--
This GraphiQL example depends on Promise and fetch, which are available in
modern browsers, but can be "polyfilled" for older browsers.
GraphiQL itself depends on React DOM.
If you do not want to rely on a CDN, you can host these files locally or
include them directly in your favored resource bundler.
-->
<script
crossorigin
src="https://unpkg.com/react@18/umd/react.development.js"
></script>
<script
crossorigin
src="https://unpkg.com/react-dom@18/umd/react-dom.development.js"
></script>
<!--
These two files can be found in the npm module, however you may wish to
copy them directly into your environment, or perhaps include them in your
favored resource bundler.
-->
<link rel="stylesheet" href="https://unpkg.com/graphiql/graphiql.min.css" />
</head>
<body>
<div id="graphiql">Loading...</div>
<script
src="https://unpkg.com/graphiql/graphiql.min.js"
type="application/javascript"
></script>
<script>
const root = ReactDOM.createRoot(document.getElementById('graphiql'));
root.render(
React.createElement(GraphiQL, {
fetcher: GraphiQL.createFetcher({
url: '/gql',
}),
defaultEditorToolsVisibility: true,
}),
);
</script>
</body>
</html>
`)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
io.WriteString(w, graphiql_string)
}
}
type GQLWSPayload struct {
OperationName string `json:"operationName,omitempty"`
Query string `json:"query,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
Extensions map[string]interface{} `json:"extensions,omitempty"`
Data string `json:"data,omitempty"`
}
type GQLWSMsg struct {
ID string `json:"id,omitempty"`
Type string `json:"type"`
Payload GQLWSPayload `json:"payload,omitempty"`
}
func enableCORS(w *http.ResponseWriter) {
(*w).Header().Set("Access-Control-Allow-Origin", "*")
(*w).Header().Set("Access-Control-Allow-Credentials", "true")
(*w).Header().Set("Access-Control-Allow-Headers", "*")
(*w).Header().Set("Access-Control-Allow-Methods", "*")
}
2023-07-13 18:21:33 -06:00
type GQLUnauthorized string
func (e GQLUnauthorized) Is(target error) bool {
error_type := reflect.TypeOf(GQLUnauthorized(""))
target_type := reflect.TypeOf(target)
return error_type == target_type
}
func (e GQLUnauthorized) Error() string {
return fmt.Sprintf("GQL_UNAUTHORIZED_ERROR: %s", string(e))
}
2023-07-13 18:28:02 -06:00
func (e GQLUnauthorized) MarshalJSON() ([]byte, error) {
return json.MarshalIndent(&struct{
Error string `json:"error"`
}{
Error: string(e),
}, "", " ")
}
2023-07-13 18:21:33 -06:00
func checkForAuthHeader(header http.Header) (string, bool) {
auths, ok := header["Authorization"]
if ok == false {
return "", false
}
for _, auth := range(auths) {
parts := strings.SplitN(auth, " ", 2)
if len(parts) != 2 {
continue
}
if parts[0] == "TM" {
return parts[1], true
}
}
return "", false
}
2023-07-09 19:33:18 -06:00
func GQLHandler(ctx * Context, server * GQLThread) func(http.ResponseWriter, *http.Request) {
gql_ctx := context.Background()
gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx)
gql_ctx = context.WithValue(gql_ctx, "gql_server", server)
2023-06-25 20:20:59 -06:00
return func(w http.ResponseWriter, r * http.Request) {
ctx.Log.Logf("gql", "GQL REQUEST: %s", r.RemoteAddr)
enableCORS(&w)
header_map := map[string]interface{}{}
for header, value := range(r.Header) {
header_map[header] = value
}
ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS")
username, password, ok := r.BasicAuth()
2023-07-13 18:21:33 -06:00
if ok == false {
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: no auth header included in request header")
json.NewEncoder(w).Encode(GQLUnauthorized("No Auth header provided"))
return
}
auth_id, err := ParseID(username)
if err != nil {
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: failed to parse ID from auth username: %s", username)
json.NewEncoder(w).Encode(GQLUnauthorized("Failed to parse ID from username"))
return
}
auth, exists := server.AuthMap[auth_id]
if exists == false {
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: no existing authorization for client %s", auth_id)
json.NewEncoder(w).Encode(GQLUnauthorized("No matching authorization for client"))
2023-07-13 18:21:33 -06:00
return
}
req_ctx := context.WithValue(gql_ctx, "auth", auth)
ctx.Log.Logf("gql", "GQL_AUTH: %+v - %s", auth, password)
2023-06-25 20:20:59 -06:00
str, err := io.ReadAll(r.Body)
if err != nil {
2023-07-13 18:21:33 -06:00
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: failed to read request body: %s", err)
2023-06-25 20:20:59 -06:00
return
2023-07-13 18:21:33 -06:00
}
2023-06-25 20:20:59 -06:00
query := GQLWSPayload{}
json.Unmarshal(str, &query)
params := graphql.Params{
Schema: ctx.GQL.Schema,
2023-07-13 18:21:33 -06:00
Context: req_ctx,
2023-06-25 20:20:59 -06:00
RequestString: query.Query,
}
if query.OperationName != "" {
params.OperationName = query.OperationName
}
if len(query.Variables) > 0 {
params.VariableValues = query.Variables
}
result := graphql.Do(params)
if len(result.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["body"] = string(str)
extra_fields["headers"] = r.Header
ctx.Log.Logm("gql", extra_fields, "wrong result, unexpected errors: %v", result.Errors)
}
json.NewEncoder(w).Encode(result)
}
}
func sendOneResultAndClose(res *graphql.Result) chan *graphql.Result {
resultChannel := make(chan *graphql.Result)
go func() {
resultChannel <- res
close(resultChannel)
}()
return resultChannel
}
func getOperationTypeOfReq(p graphql.Params) string{
source := source.NewSource(&source.Source{
Body: []byte(p.RequestString),
Name: "GraphQL request",
})
AST, err := parser.Parse(parser.ParseParams{Source: source})
if err != nil {
return ""
}
for _, node := range AST.Definitions {
if operationDef, ok := node.(*ast.OperationDefinition); ok {
name := ""
if operationDef.Name != nil {
name = operationDef.Name.Value
}
if name == p.OperationName || p.OperationName == "" {
return operationDef.Operation
}
}
}
return ""
}
2023-07-09 19:33:18 -06:00
func GQLWSDo(ctx * Context, p graphql.Params) chan *graphql.Result {
2023-06-25 20:20:59 -06:00
operation := getOperationTypeOfReq(p)
ctx.Log.Logf("gqlws", "GQLWSDO_OPERATION: %s %+v", operation, p.RequestString)
if operation == ast.OperationTypeSubscription {
return graphql.Subscribe(p)
}
res := graphql.Do(p)
return sendOneResultAndClose(res)
}
2023-07-09 19:33:18 -06:00
func GQLWSHandler(ctx * Context, server * GQLThread) func(http.ResponseWriter, *http.Request) {
gql_ctx := context.Background()
gql_ctx = context.WithValue(gql_ctx, "graph_context", ctx)
gql_ctx = context.WithValue(gql_ctx, "gql_server", server)
2023-06-25 20:20:59 -06:00
return func(w http.ResponseWriter, r * http.Request) {
ctx.Log.Logf("gqlws_new", "HANDLING %s",r.RemoteAddr)
enableCORS(&w)
2023-06-25 20:20:59 -06:00
header_map := map[string]interface{}{}
for header, value := range(r.Header) {
header_map[header] = value
}
2023-07-13 18:23:57 -06:00
2023-06-25 20:20:59 -06:00
ctx.Log.Logm("gql", header_map, "REQUEST_HEADERS")
username, password, ok := r.BasicAuth()
2023-07-13 18:23:57 -06:00
if ok == false {
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: no auth header included in request header")
json.NewEncoder(w).Encode(GQLUnauthorized("No Auth header provided"))
2023-07-13 18:23:57 -06:00
return
}
auth_id, err := ParseID(username)
if err != nil {
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: failed to parse ID from auth username: %s", username)
json.NewEncoder(w).Encode(GQLUnauthorized("Failed to parse ID from username"))
return
}
auth, exists := server.AuthMap[auth_id]
if exists == false {
ctx.Log.Logf("gql", "GQL_REQUEST_ERR: no existing authorization for client %s", auth_id)
json.NewEncoder(w).Encode(GQLUnauthorized("No matching authorization for client"))
return
}
req_ctx := context.WithValue(gql_ctx, "auth", auth)
ctx.Log.Logf("gql", "GQL_AUTH: %+v - %s", auth, password)
2023-07-13 18:23:57 -06:00
2023-06-25 20:20:59 -06:00
u := ws.HTTPUpgrader{
Protocol: func(protocol string) bool {
ctx.Log.Logf("gqlws", "UPGRADE_PROTOCOL: %s", string(protocol))
if string(protocol) == "graphql-transport-ws" || string(protocol) == "graphql-ws" {
return true
}
return false
2023-06-25 20:20:59 -06:00
},
}
conn, _, _, err := u.Upgrade(r, w)
if err == nil {
defer conn.Close()
conn_state := "init"
for {
msg_raw, op, err := wsutil.ReadClientData(conn)
ctx.Log.Logf("gqlws_hb", "MSG: %s\nOP: 0x%02x\nERR: %+v\n", string(msg_raw), op, err)
msg := GQLWSMsg{}
json.Unmarshal(msg_raw, &msg)
if err != nil {
ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR")
break
}
if msg.Type == "connection_init" {
if conn_state != "init" {
ctx.Log.Logf("gqlws", "WS_CLIENT_ERROR: INIT WHILE IN %s", conn_state)
break
}
conn_state = "ready"
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"connection_ack\"}"))
if err != nil {
ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND connection_ack")
break
}
} else if msg.Type == "ping" {
ctx.Log.Logf("gqlws_hb", "PING FROM %s", r.RemoteAddr)
err = wsutil.WriteServerMessage(conn, 1, []byte("{\"type\": \"pong\"}"))
if err != nil {
ctx.Log.Logf("gqlws", "WS_SERVER_ERROR: FAILED TO SEND PONG")
}
} else if msg.Type == "subscribe" {
ctx.Log.Logf("gqlws", "SUBSCRIBE: %+v", msg.Payload)
params := graphql.Params{
Schema: ctx.GQL.Schema,
Context: req_ctx,
2023-06-25 20:20:59 -06:00
RequestString: msg.Payload.Query,
}
if msg.Payload.OperationName != "" {
params.OperationName = msg.Payload.OperationName
}
if len(msg.Payload.Variables) > 0 {
params.VariableValues = msg.Payload.Variables
}
res_chan := GQLWSDo(ctx, params)
2023-07-03 19:13:29 -06:00
if res_chan == nil {
ctx.Log.Logf("gqlws", "res_chan is nil")
} else {
ctx.Log.Logf("gqlws", "res_chan: %+v", res_chan)
}
2023-06-25 20:20:59 -06:00
go func(res_chan chan *graphql.Result) {
for {
next, ok := <-res_chan
if ok == false {
ctx.Log.Logf("gqlws", "response channel was closed")
return
}
if next == nil {
ctx.Log.Logf("gqlws", "NIL_ON_CHANNEL")
return
}
if len(next.Errors) > 0 {
extra_fields := map[string]interface{}{}
extra_fields["query"] = string(msg.Payload.Query)
ctx.Log.Logm("gqlws", extra_fields, "ERROR: wrong result, unexpected errors: %+v", next.Errors)
continue
}
ctx.Log.Logf("gqlws", "DATA: %+v", next.Data)
data, err := json.Marshal(next.Data)
if err != nil {
ctx.Log.Logf("gqlws", "ERROR: %+v", err)
continue
}
msg, err := json.Marshal(GQLWSMsg{
ID: msg.ID,
Type: "next",
Payload: GQLWSPayload{
Data: string(data),
},
})
if err != nil {
ctx.Log.Logf("gqlws", "ERROR: %+v", err)
continue
}
err = wsutil.WriteServerMessage(conn, 1, msg)
if err != nil {
ctx.Log.Logf("gqlws", "ERROR: %+v", err)
continue
}
}
}(res_chan)
} else {
}
}
return
} else {
panic("Failed to upgrade websocket")
}
}
}
type GQLThread struct {
2023-07-09 19:33:18 -06:00
SimpleThread
tcp_listener net.Listener
http_server *http.Server
http_done *sync.WaitGroup
2023-07-09 19:33:18 -06:00
Listen string
AuthMap map[NodeID]AuthData
Key *ecdsa.PrivateKey
ECDH ecdh.Curve
2023-06-25 20:20:59 -06:00
}
2023-07-09 19:33:18 -06:00
func (thread * GQLThread) Type() NodeType {
return NodeType("gql_thread")
}
2023-07-09 19:33:18 -06:00
func (thread * GQLThread) Serialize() ([]byte, error) {
thread_json := NewGQLThreadJSON(thread)
return json.MarshalIndent(&thread_json, "", " ")
}
2023-07-09 20:30:19 -06:00
func (thread * GQLThread) DeserializeInfo(ctx *Context, data []byte) (ThreadInfo, error) {
var info ParentThreadInfo
2023-07-09 20:30:19 -06:00
err := json.Unmarshal(data, &info)
if err != nil {
return nil, err
}
return &info, nil
}
2023-07-09 19:33:18 -06:00
type GQLThreadJSON struct {
SimpleThreadJSON
Listen string `json:"listen"`
AuthMap map[string]AuthDataJSON `json:"auth_map"`
Key []byte `json:"key"`
ECDH uint8 `json:"ecdh_curve"`
}
var ecdsa_curves = map[uint8]elliptic.Curve{
0: elliptic.P256(),
}
var ecdsa_curve_ids = map[elliptic.Curve]uint8{
elliptic.P256(): 0,
}
var ecdh_curves = map[uint8]ecdh.Curve{
0: ecdh.P256(),
}
var ecdh_curve_ids = map[ecdh.Curve]uint8{
ecdh.P256(): 0,
}
2023-07-09 19:33:18 -06:00
func NewGQLThreadJSON(thread *GQLThread) GQLThreadJSON {
thread_json := NewSimpleThreadJSON(&thread.SimpleThread)
ser_key, err := x509.MarshalECPrivateKey(thread.Key)
if err != nil {
panic(err)
}
auth_map := map[string]AuthDataJSON{}
for id, data := range(thread.AuthMap) {
shared, err := x509.MarshalECPrivateKey(data.Shared)
if err != nil {
panic(err)
}
auth_map[id.String()] = AuthDataJSON{
Granted: data.Granted,
Pubkey: elliptic.Marshal(data.Pubkey.Curve, data.Pubkey.X, data.Pubkey.Y),
Shared: shared,
}
}
2023-07-09 19:33:18 -06:00
return GQLThreadJSON{
SimpleThreadJSON: thread_json,
Listen: thread.Listen,
AuthMap: auth_map,
Key: ser_key,
ECDH: ecdh_curve_ids[thread.ECDH],
2023-07-09 19:33:18 -06:00
}
}
2023-07-09 19:33:18 -06:00
func LoadGQLThread(ctx *Context, id NodeID, data []byte, nodes NodeMap) (Node, error) {
var j GQLThreadJSON
err := json.Unmarshal(data, &j)
if err != nil {
return nil, err
}
ecdh_curve, ok := ecdh_curves[j.ECDH]
if ok == false {
return nil, fmt.Errorf("%d is not a known ECDH curve ID", j.ECDH)
}
key, err := x509.ParseECPrivateKey(j.Key)
if err != nil {
return nil, err
}
thread := NewGQLThread(id, j.Name, j.StateName, j.Listen, ecdh_curve, key)
thread.AuthMap = map[NodeID]AuthData{}
for id_str, auth_json := range(j.AuthMap) {
id, err := ParseID(id_str)
if err != nil {
return nil, err
}
x, y := elliptic.Unmarshal(key.Curve, auth_json.Pubkey)
if x == nil {
return nil, fmt.Errorf("Failed to load public key for curve %+v from %+v", key.Curve, auth_json.Pubkey)
}
shared, err := x509.ParseECPrivateKey(auth_json.Shared)
if err != nil {
return nil, err
}
thread.AuthMap[id] = AuthData{
Granted: auth_json.Granted,
Pubkey: &ecdsa.PublicKey{
Curve: key.Curve,
X: x,
Y: y,
},
Shared: shared,
}
}
2023-07-09 19:33:18 -06:00
nodes[id] = &thread
err = RestoreSimpleThread(ctx, &thread, j.SimpleThreadJSON, nodes)
if err != nil {
return nil, err
}
2023-07-09 19:33:18 -06:00
return &thread, nil
}
func NewGQLThread(id NodeID, name string, state_name string, listen string, ecdh_curve ecdh.Curve, key *ecdsa.PrivateKey) GQLThread {
2023-07-09 19:33:18 -06:00
return GQLThread{
SimpleThread: NewSimpleThread(id, name, state_name, reflect.TypeOf((*ParentThreadInfo)(nil)), gql_actions, gql_handlers),
2023-06-25 20:20:59 -06:00
Listen: listen,
AuthMap: map[NodeID]AuthData{},
2023-07-09 20:30:19 -06:00
http_done: &sync.WaitGroup{},
Key: key,
ECDH: ecdh_curve,
2023-06-25 20:20:59 -06:00
}
}
var gql_actions ThreadActions = ThreadActions{
2023-07-02 12:14:04 -06:00
"wait": ThreadWait,
2023-07-09 19:33:18 -06:00
"restore": func(ctx * Context, thread Thread) (string, error) {
// Start all the threads that should be "started"
ctx.Log.Logf("gql", "GQL_THREAD_RESTORE: %s", thread.ID())
ThreadRestore(ctx, thread)
return "start_server", nil
},
2023-07-09 19:33:18 -06:00
"start": func(ctx * Context, thread Thread) (string, error) {
ctx.Log.Logf("gql", "GQL_START")
err := ThreadStart(ctx, thread)
if err != nil {
return "", err
}
return "start_server", nil
},
2023-07-09 19:33:18 -06:00
"start_server": func(ctx * Context, thread Thread) (string, error) {
2023-07-02 12:14:04 -06:00
server, ok := thread.(*GQLThread)
if ok == false {
return "", fmt.Errorf("GQL_THREAD_START: %s is not GQLThread, %+v", thread.ID(), thread.State())
2023-07-02 12:14:04 -06:00
}
ctx.Log.Logf("gql", "GQL_START_SERVER")
// Serve the GQL http and ws handlers
mux := http.NewServeMux()
mux.HandleFunc("/auth", AuthHandler(ctx, server))
mux.HandleFunc("/gql", GQLHandler(ctx, server))
mux.HandleFunc("/gqlws", GQLWSHandler(ctx, server))
// Server a graphiql interface(TODO make configurable whether to start this)
mux.HandleFunc("/graphiql", GraphiQLHandler())
// Server the ./site directory to /site (TODO make configurable with better defaults)
fs := http.FileServer(http.Dir("./site"))
mux.Handle("/site/", http.StripPrefix("/site", fs))
http_server := &http.Server{
Addr: server.Listen,
Handler: mux,
}
listener, err := net.Listen("tcp", http_server.Addr)
if err != nil {
return "", fmt.Errorf("Failed to start listener for server on %s", http_server.Addr)
}
server.http_done.Add(1)
go func(server *GQLThread) {
defer server.http_done.Done()
err = http_server.Serve(listener)
if err != http.ErrServerClosed {
panic(fmt.Sprintf("Failed to start gql server: %s", err))
}
}(server)
UseStates(ctx, []Node{server}, func(nodes NodeMap)(error){
server.tcp_listener = listener
server.http_server = http_server
return server.Signal(ctx, NewSignal(server, "server_started"), nodes)
})
2023-06-25 20:20:59 -06:00
return "wait", nil
},
}
var gql_handlers ThreadHandlers = ThreadHandlers{
2023-07-09 19:33:18 -06:00
"child_added": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: %+v", signal)
2023-07-09 19:33:18 -06:00
UpdateStates(ctx, []Node{thread}, func(nodes NodeMap)(error) {
should_run, exists := thread.ChildInfo(signal.Source()).(*ParentThreadInfo)
if exists == false {
ctx.Log.Logf("gql", "GQL_THREAD_CHILD_ADDED: tried to start %s whis is not a child")
return nil
}
if should_run.Start == true {
2023-07-09 19:33:18 -06:00
ChildGo(ctx, thread, thread.Child(signal.Source()), should_run.StartAction)
}
return nil
})
return "wait", nil
},
2023-07-09 19:33:18 -06:00
"abort": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_ABORT")
server := thread.(*GQLThread)
server.http_server.Shutdown(context.TODO())
server.http_done.Wait()
2023-07-09 20:30:19 -06:00
return ThreadAbort(ctx, thread, signal)
},
2023-07-09 19:33:18 -06:00
"cancel": func(ctx * Context, thread Thread, signal GraphSignal) (string, error) {
ctx.Log.Logf("gql", "GQL_CANCEL")
server := thread.(*GQLThread)
server.http_server.Shutdown(context.TODO())
server.http_done.Wait()
2023-07-09 20:30:19 -06:00
return ThreadCancel(ctx, thread, signal)
},
2023-06-25 20:20:59 -06:00
}