Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Add CLI flags for topic suport to the executable/docker image
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum committed Jan 18, 2024
1 parent 59fc8a6 commit 5d09b74
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 16 deletions.
21 changes: 12 additions & 9 deletions cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
)

// ExecuteRequest describes the payload for the REST API request for function execution.
type ExecuteRequest execute.Request
type ExecuteRequest struct {
execute.Request
Subgroup string `json:"subgroup,omitempty"`
}

// ExecuteResponse describes the REST API response for function execution.
type ExecuteResponse struct {
Expand All @@ -33,36 +36,36 @@ type ExecuteResult struct {
RequestID string `json:"request_id,omitempty"`
}

func createExecutor(a api.API) func (ctx echo.Context) error {
return func (ctx echo.Context) error {
func createExecutor(a api.API) func(ctx echo.Context) error {
return func(ctx echo.Context) error {

// Unpack the API request.
var req ExecuteRequest
err := ctx.Bind(&req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
}

// Get the execution result.
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req))
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), req.Request, req.Subgroup)
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
}

// Transform the node response format to the one returned by the API.
res := ExecuteResponse{
Code: code,
RequestID: id,
Results: aggregate.Aggregate(results),
Cluster: cluster,
}

// Communicate the reason for failure in these cases.
if errors.Is(err, blockless.ErrRollCallTimeout) || errors.Is(err, blockless.ErrExecutionNotEnoughNodes) {
res.Message = err.Error()
}

// Send the response.
return ctx.JSON(http.StatusOK, res)
}
}
}
1 change: 1 addition & 0 deletions cmd/node/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func parseFlags() *config.Config {
pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)")
pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime path (used by the worker node)")
pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS")
pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to")

// Host configuration.
pflag.StringVar(&cfg.Host.PrivateKey, "private-key", "", "private key that the b7s host will use")
Expand Down
11 changes: 8 additions & 3 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func run() int {
// Create function store.
fstore := fstore.New(log, functionStore, cfg.Workspace)

// If we have topics specified, use those.
if len(cfg.Topics) > 0 {
opts = append(opts, node.WithTopics(cfg.Topics))
}

// Instantiate node.
node, err := node.New(log, host, peerstore, fstore, opts...)
if err != nil {
Expand All @@ -200,17 +205,17 @@ func run() int {

log.Info().
Str("role", role.String()).
Msg("Upshot Node Node starting")
Msg("Upshot Node starting")

err := node.Run(ctx)
if err != nil {
log.Error().Err(err).Msg("Upshot Node Node failed")
log.Error().Err(err).Msg("Upshot Node failed")
close(failed)
} else {
close(done)
}

log.Info().Msg("Upshot Node Node stopped")
log.Info().Msg("Upshot Node stopped")
}()

// If we're a head node - start the REST API.
Expand Down
7 changes: 6 additions & 1 deletion docker/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ if [ -n "$BOOT_NODES" ]; then
bootnode_args="--boot-nodes $BOOT_NODES"
fi

topic_args=""
if [ -n "$TOPICS" ]; then
topic_args="--topic $TOPICS"
fi

if [ "$NODE_ROLE" = "head" ]; then
./upshot-node --peer-db /var/tmp/upshot/peerdb --function-db /var/tmp/upshot/function-db --log-level debug --port $P2P_PORT --role head --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH --rest-api :$REST_API $dialback_args $bootnode_args
else
./upshot-node --peer-db ./peer-database --function-db ./function-database--log-level debug --port $P2P_PORT --role worker --runtime-path /app/runtime --runtime-cli bls-runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args
./upshot-node --peer-db ./peer-database --function-db ./function-database --log-level debug --port $P2P_PORT --role worker --runtime-path /app/runtime --runtime-cli bls-runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args $topic_args
fi
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ require (
)

require (
github.com/blocklessnetwork/b7s v0.4.5
github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af
github.com/labstack/gommon v0.4.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5M
github.com/bits-and-blooms/bitset v1.8.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/blocklessnetwork/b7s v0.4.5 h1:EtiP4NlkwZgDgQKQGx5PO9XWzJgDGV7Civ/BFemSHxM=
github.com/blocklessnetwork/b7s v0.4.5/go.mod h1:fkA5Te5tqk+foXMS50qe0zSx4V5KaO4PLKKpt2AudQE=
github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af h1:SBlngQZUbvA9b1XTyO8BtQltWS1nULRLREwsbI/ZWHI=
github.com/blocklessnetwork/b7s v0.4.6-0.20240112135439-7ed12207f6af/go.mod h1:Y/Tqev+RbvkYSAF3B+odJ5wPEfGEICR8mRijF5zns0E=
github.com/blocklessnetwork/b7s-attributes v0.0.0-20231003175355-c87ad3eae97e h1:MbORHFMckGPdJq3Wdya+DqrY5iyvUn4PdcWyI3ywd30=
github.com/blocklessnetwork/b7s-attributes v0.0.0-20231003175355-c87ad3eae97e/go.mod h1:OvkFdYsKp1RFpfa/nLuOc4Aif08U6O9uOcAnXgz/qJY=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
Expand Down

0 comments on commit 5d09b74

Please sign in to comment.