From 5d09b74674443577230faec5e526967bfd82ae25 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Thu, 18 Jan 2024 19:49:51 +0100 Subject: [PATCH] Add CLI flags for topic suport to the executable/docker image --- cmd/node/execute.go | 21 ++++++++++++--------- cmd/node/flags.go | 1 + cmd/node/main.go | 11 ++++++++--- docker/run.sh | 7 ++++++- go.mod | 2 +- go.sum | 4 ++-- 6 files changed, 30 insertions(+), 16 deletions(-) diff --git a/cmd/node/execute.go b/cmd/node/execute.go index 4a6afa2..3ea85b4 100644 --- a/cmd/node/execute.go +++ b/cmd/node/execute.go @@ -14,7 +14,10 @@ import ( ) // ExecuteRequest describes the payload for the REST API request for function execution. -type ExecuteRequest execute.Request +type ExecuteRequest struct { + execute.Request + Subgroup string `json:"subgroup,omitempty"` +} // ExecuteResponse describes the REST API response for function execution. type ExecuteResponse struct { @@ -33,8 +36,8 @@ type ExecuteResult struct { RequestID string `json:"request_id,omitempty"` } -func createExecutor(a api.API) func (ctx echo.Context) error { - return func (ctx echo.Context) error { +func createExecutor(a api.API) func(ctx echo.Context) error { + return func(ctx echo.Context) error { // Unpack the API request. var req ExecuteRequest @@ -42,13 +45,13 @@ func createExecutor(a api.API) func (ctx echo.Context) error { if err != nil { return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err)) } - + // Get the execution result. - code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req)) + code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), req.Request, req.Subgroup) if err != nil { a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function") } - + // Transform the node response format to the one returned by the API. res := ExecuteResponse{ Code: code, @@ -56,13 +59,13 @@ func createExecutor(a api.API) func (ctx echo.Context) error { Results: aggregate.Aggregate(results), Cluster: cluster, } - + // Communicate the reason for failure in these cases. if errors.Is(err, blockless.ErrRollCallTimeout) || errors.Is(err, blockless.ErrExecutionNotEnoughNodes) { res.Message = err.Error() } - + // Send the response. return ctx.JSON(http.StatusOK, res) } -} \ No newline at end of file +} diff --git a/cmd/node/flags.go b/cmd/node/flags.go index cd70e01..0c7532e 100644 --- a/cmd/node/flags.go +++ b/cmd/node/flags.go @@ -35,6 +35,7 @@ func parseFlags() *config.Config { pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)") pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime path (used by the worker node)") pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS") + pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to") // Host configuration. pflag.StringVar(&cfg.Host.PrivateKey, "private-key", "", "private key that the b7s host will use") diff --git a/cmd/node/main.go b/cmd/node/main.go index ea5a781..e6bd20e 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -181,6 +181,11 @@ func run() int { // Create function store. fstore := fstore.New(log, functionStore, cfg.Workspace) + // If we have topics specified, use those. + if len(cfg.Topics) > 0 { + opts = append(opts, node.WithTopics(cfg.Topics)) + } + // Instantiate node. node, err := node.New(log, host, peerstore, fstore, opts...) if err != nil { @@ -200,17 +205,17 @@ func run() int { log.Info(). Str("role", role.String()). - Msg("Upshot Node Node starting") + Msg("Upshot Node starting") err := node.Run(ctx) if err != nil { - log.Error().Err(err).Msg("Upshot Node Node failed") + log.Error().Err(err).Msg("Upshot Node failed") close(failed) } else { close(done) } - log.Info().Msg("Upshot Node Node stopped") + log.Info().Msg("Upshot Node stopped") }() // If we're a head node - start the REST API. diff --git a/docker/run.sh b/docker/run.sh index e4497c1..9b9ab7c 100644 --- a/docker/run.sh +++ b/docker/run.sh @@ -80,8 +80,13 @@ if [ -n "$BOOT_NODES" ]; then bootnode_args="--boot-nodes $BOOT_NODES" fi +topic_args="" +if [ -n "$TOPICS" ]; then + topic_args="--topic $TOPICS" +fi + if [ "$NODE_ROLE" = "head" ]; then ./upshot-node --peer-db /var/tmp/upshot/peerdb --function-db /var/tmp/upshot/function-db --log-level debug --port $P2P_PORT --role head --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH --rest-api :$REST_API $dialback_args $bootnode_args else - ./upshot-node --peer-db ./peer-database --function-db ./function-database--log-level debug --port $P2P_PORT --role worker --runtime-path /app/runtime --runtime-cli bls-runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args + ./upshot-node --peer-db ./peer-database --function-db ./function-database --log-level debug --port $P2P_PORT --role worker --runtime-path /app/runtime --runtime-cli bls-runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args $topic_args fi \ No newline at end of file diff --git a/go.mod b/go.mod index a98f41f..fd1472f 100644 --- a/go.mod +++ b/go.mod @@ -252,7 +252,7 @@ require ( ) require ( - github.com/blocklessnetwork/b7s v0.4.5 + github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af github.com/labstack/gommon v0.4.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index 04dd1e3..cbbe16b 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5M github.com/bits-and-blooms/bitset v1.8.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/blocklessnetwork/b7s v0.4.5 h1:EtiP4NlkwZgDgQKQGx5PO9XWzJgDGV7Civ/BFemSHxM= -github.com/blocklessnetwork/b7s v0.4.5/go.mod h1:fkA5Te5tqk+foXMS50qe0zSx4V5KaO4PLKKpt2AudQE= +github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af h1:SBlngQZUbvA9b1XTyO8BtQltWS1nULRLREwsbI/ZWHI= +github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af/go.mod h1:Y/Tqev+RbvkYSAF3B+odJ5wPEfGEICR8mRijF5zns0E= github.com/blocklessnetwork/b7s-attributes v0.0.0-20231003175355-c87ad3eae97e h1:MbORHFMckGPdJq3Wdya+DqrY5iyvUn4PdcWyI3ywd30= github.com/blocklessnetwork/b7s-attributes v0.0.0-20231003175355-c87ad3eae97e/go.mod h1:OvkFdYsKp1RFpfa/nLuOc4Aif08U6O9uOcAnXgz/qJY= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=