Skip to content

Commit

Permalink
Merge pull request filecoin-project#11478 from filecoin-project/diBre…
Browse files Browse the repository at this point in the history
…akout

maint: break out lp deps for easy testing
  • Loading branch information
snadrus authored Dec 18, 2023
2 parents 62fe670 + b1c1839 commit c1b42a8
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 406 deletions.
47 changes: 7 additions & 40 deletions cmd/lotus-provider/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"io"
Expand All @@ -14,7 +13,7 @@ import (
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
"github.com/filecoin-project/lotus/node/config"
)

Expand Down Expand Up @@ -77,7 +76,7 @@ var configSetCmd = &cli.Command{
Action: func(cctx *cli.Context) error {
args := cctx.Args()

db, err := makeDB(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -131,7 +130,7 @@ var configGetCmd = &cli.Command{
if args.Len() != 1 {
return fmt.Errorf("want 1 layer arg, got %d", args.Len())
}
db, err := makeDB(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
Expand All @@ -153,7 +152,7 @@ var configListCmd = &cli.Command{
Usage: "List config layers you can get.",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
db, err := makeDB(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
Expand All @@ -180,7 +179,7 @@ var configRmCmd = &cli.Command{
if args.Len() != 1 {
return errors.New("must have exactly 1 arg for the layer name")
}
db, err := makeDB(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -209,11 +208,11 @@ var configViewCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
db, err := makeDB(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
lp, err := getConfig(cctx, db)
lp, err := deps.GetConfig(cctx, db)
if err != nil {
return err
}
Expand All @@ -225,35 +224,3 @@ var configViewCmd = &cli.Command{
return nil
},
}

func getConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig, error) {
lp := config.DefaultLotusProvider()
have := []string{}
layers := cctx.StringSlice("layers")
for _, layer := range layers {
text := ""
err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text)
if err != nil {
if strings.Contains(err.Error(), sql.ErrNoRows.Error()) {
return nil, fmt.Errorf("missing layer '%s' ", layer)
}
if layer == "base" {
return nil, errors.New(`lotus-provider defaults to a layer named 'base'.
Either use 'migrate' command or edit a base.toml and upload it with: lotus-provider config set base.toml`)
}
return nil, fmt.Errorf("could not read layer '%s': %w", layer, err)
}
meta, err := toml.Decode(text, &lp)
if err != nil {
return nil, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err)
}
for _, k := range meta.Keys() {
have = append(have, strings.Join(k, " "))
}
}
_ = have // FUTURE: verify that required fields are here.
// If config includes 3rd-party config, consider JSONSchema as a way that
// 3rd-parties can dynamically include config requirements and we can
// validate the config. Because of layering, we must validate @ startup.
return lp, nil
}
276 changes: 276 additions & 0 deletions cmd/lotus-provider/deps/deps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Package deps provides the dependencies for the lotus provider node.
package deps

import (
"context"
"database/sql"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
"strings"

"github.com/BurntSushi/toml"
"github.com/gbrlsnchs/jwt/v3"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-statestore"

"github.com/filecoin-project/lotus/api"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("lotus-provider/deps")

func MakeDB(cctx *cli.Context) (*harmonydb.DB, error) {
dbConfig := config.HarmonyDB{
Username: cctx.String("db-user"),
Password: cctx.String("db-password"),
Hosts: strings.Split(cctx.String("db-host"), ","),
Database: cctx.String("db-name"),
Port: cctx.String("db-port"),
}
return harmonydb.NewFromConfig(dbConfig)
}

type JwtPayload struct {
Allow []auth.Permission
}

func StorageAuth(apiKey string) (sealer.StorageAuth, error) {
if apiKey == "" {
return nil, xerrors.Errorf("no api key provided")
}

rawKey, err := base64.StdEncoding.DecodeString(apiKey)
if err != nil {
return nil, xerrors.Errorf("decoding api key: %w", err)
}

key := jwt.NewHS256(rawKey)

p := JwtPayload{
Allow: []auth.Permission{"admin"},
}

token, err := jwt.Sign(&p, key)
if err != nil {
return nil, err
}

headers := http.Header{}
headers.Add("Authorization", "Bearer "+string(token))
return sealer.StorageAuth(headers), nil
}

func GetDeps(ctx context.Context, cctx *cli.Context) (*Deps, error) {
var deps Deps
return &deps, deps.PopulateRemainingDeps(ctx, cctx, true)
}

type Deps struct {
Cfg *config.LotusProviderConfig
DB *harmonydb.DB
Full api.FullNode
Verif storiface.Verifier
LW *sealer.LocalWorker
As *ctladdr.AddressSelector
Maddrs []dtypes.MinerAddress
Stor *paths.Remote
Si *paths.DBIndex
LocalStore *paths.Local
ListenAddr string
}

const (
FlagRepoPath = "repo-path"
)

func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, makeRepo bool) error {
var err error
if makeRepo {
// Open repo
repoPath := cctx.String(FlagRepoPath)
fmt.Println("repopath", repoPath)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}

ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
if err := r.Init(repo.Provider); err != nil {
return err
}
}
}

if deps.Cfg == nil {
deps.DB, err = MakeDB(cctx)
if err != nil {
return err
}
}

if deps.Cfg == nil {
// The config feeds into task runners & their helpers
deps.Cfg, err = GetConfig(cctx, deps.DB)
if err != nil {
return err
}
}

log.Debugw("config", "config", deps.Cfg)

if deps.Verif == nil {
deps.Verif = ffiwrapper.ProofVerifier
}

if deps.As == nil {
deps.As, err = provider.AddressSelector(&deps.Cfg.Addresses)()
if err != nil {
return err
}
}

if deps.Si == nil {
de, err := journal.ParseDisabledEvents(deps.Cfg.Journal.DisabledEvents)
if err != nil {
return err
}
j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de)
if err != nil {
return err
}
go func() {
<-ctx.Done()
_ = j.Close()
}()

al := alerting.NewAlertingSystem(j)
deps.Si = paths.NewDBIndex(al, deps.DB)
}

if deps.Full == nil {
var fullCloser func()
deps.Full, fullCloser, err = cliutil.GetFullNodeAPIV1LotusProvider(cctx, deps.Cfg.Apis.ChainApiInfo)
if err != nil {
return err
}

go func() {
<-ctx.Done()
fullCloser()
}()
}

bls := &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}

if deps.ListenAddr == "" {
listenAddr := cctx.String("listen")
const unspecifiedAddress = "0.0.0.0"
addressSlice := strings.Split(listenAddr, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
rip, err := deps.DB.GetRoutableIP()
if err != nil {
return err
}
deps.ListenAddr = rip + ":" + addressSlice[1]
}
}
}
if deps.LocalStore == nil {
deps.LocalStore, err = paths.NewLocal(ctx, bls, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"})
if err != nil {
return err
}
}

sa, err := StorageAuth(deps.Cfg.Apis.StorageRPCSecret)
if err != nil {
return xerrors.Errorf(`'%w' while parsing the config toml's
[Apis]
StorageRPCSecret=%v
Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, deps.Cfg.Apis.StorageRPCSecret)
}
if deps.Stor == nil {
deps.Stor = paths.NewRemote(deps.LocalStore, deps.Si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
}
if deps.LW == nil {
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))

// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
deps.LW = sealer.NewLocalWorker(sealer.WorkerConfig{}, deps.Stor, deps.LocalStore, deps.Si, nil, wstates)
}
if len(deps.Maddrs) == 0 {
for _, s := range deps.Cfg.Addresses.MinerAddresses {
addr, err := address.NewFromString(s)
if err != nil {
return err
}
deps.Maddrs = append(deps.Maddrs, dtypes.MinerAddress(addr))
}
}
fmt.Println("last line of populate")
return nil
}

func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig, error) {
lp := config.DefaultLotusProvider()
have := []string{}
layers := cctx.StringSlice("layers")
for _, layer := range layers {
text := ""
err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text)
if err != nil {
if strings.Contains(err.Error(), sql.ErrNoRows.Error()) {
return nil, fmt.Errorf("missing layer '%s' ", layer)
}
if layer == "base" {
return nil, errors.New(`lotus-provider defaults to a layer named 'base'.
Either use 'migrate' command or edit a base.toml and upload it with: lotus-provider config set base.toml`)
}
return nil, fmt.Errorf("could not read layer '%s': %w", layer, err)
}
meta, err := toml.Decode(text, &lp)
if err != nil {
return nil, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err)
}
for _, k := range meta.Keys() {
have = append(have, strings.Join(k, " "))
}
}
_ = have // FUTURE: verify that required fields are here.
// If config includes 3rd-party config, consider JSONSchema as a way that
// 3rd-parties can dynamically include config requirements and we can
// validate the config. Because of layering, we must validate @ startup.
return lp, nil
}
Loading

0 comments on commit c1b42a8

Please sign in to comment.