diff --git a/config/datastore.go b/config/datastore.go index 1a5994a1750..44ce96a4589 100644 --- a/config/datastore.go +++ b/config/datastore.go @@ -21,8 +21,9 @@ type Datastore struct { Spec map[string]interface{} - HashOnRead bool - BloomFilterSize int + HashOnRead bool + BloomFilterSize int + BlockKeyCacheSize int } // DataStorePath returns the default data store path given a configuration root diff --git a/config/init.go b/config/init.go index 6099712f4b0..ff4acdc0da4 100644 --- a/config/init.go +++ b/config/init.go @@ -134,6 +134,7 @@ func DefaultDatastoreConfig() Datastore { StorageGCWatermark: 90, // 90% GCPeriod: "1h", BloomFilterSize: 0, + BlockKeyCacheSize: 64 << 10, // 64KiB Spec: flatfsSpec(), } } diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go index 5e39393c1ce..6ffd7d739b8 100644 --- a/core/commands/dag/import.go +++ b/core/commands/dag/import.go @@ -55,7 +55,14 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment // this is *not* a transaction // it is simply a way to relieve pressure on the blockstore // similar to pinner.Pin/pinner.Flush - batch := ipld.NewBatch(req.Context, api.Dag()) + batch := ipld.NewBatch(req.Context, api.Dag(), + // 128 file descriptors needed in flatfs + ipld.MaxNodesBatchOption(128), + // 100MiB. When setting block size to 1MiB, we can add 100 + // nodes maximum. With default 256KiB block-size, we will hit + // the max nodes limit at 32MiB. + ipld.MaxSizeBatchOption(100<<20), + ) roots := cid.NewSet() var blockCount, blockBytesCount uint64 diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index b757929a26c..1daa5c7bba5 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -207,12 +207,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e return nil } - if settings.Offline { - cfg, err := n.Repo.Config() - if err != nil { - return nil, err - } + cfg, err := n.Repo.Config() + if err != nil { + return nil, err + } + if settings.Offline { cs := cfg.Ipns.ResolveCacheSize if cs == 0 { cs = node.DefaultIpnsCacheSize @@ -244,7 +244,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e if settings.Offline || !settings.FetchBlocks { subAPI.exchange = offlinexch.Exchange(subAPI.blockstore) - subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange) + var bsopts []bserv.Option + // If bloom filter disable, do not do Has() when writing. + if cfg.Datastore.BloomFilterSize == 0 { + bsopts = append(bsopts, bserv.WriteThrough()) + } + subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange, bsopts...) subAPI.dag = dag.NewDAGService(subAPI.blocks) } diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 9d91f09b6ef..26ceb0a4897 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -91,7 +91,12 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options pinning = nil // pinner will never be used } - bserv := blockservice.New(addblockstore, exch) // hash security 001 + var bsopts []blockservice.Option + // If bloom filter disabled, do not do Has() when writing. + if cfg.Datastore.BloomFilterSize == 0 { + bsopts = append(bsopts, blockservice.WriteThrough()) + } + bserv := blockservice.New(addblockstore, exch, bsopts...) // hash security 001 dserv := merkledag.NewDAGService(bserv) // add a sync call to the DagService diff --git a/core/node/core.go b/core/node/core.go index ad259c962f3..4676e6a442c 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -24,21 +24,32 @@ import ( dagpb "github.com/ipld/go-codec-dagpb" "go.uber.org/fx" + "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/node/helpers" "github.com/ipfs/kubo/repo" ) // BlockService creates new blockservice which provides an interface to fetch content-addressable blocks -func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { - bsvc := blockservice.New(bs, rem) +func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { + return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { + var opts []blockservice.Option + // If bloom filter is disabled, do not do Has() when writing. + // We defer to the datastore how to handle this efficiently, + // but we cannot assume that triggering Reads for every white + // is fine. + if cfg.Datastore.BloomFilterSize == 0 { + opts = append(opts, blockservice.WriteThrough()) + } + bsvc := blockservice.New(bs, rem, opts...) - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return bsvc.Close() - }, - }) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return bsvc.Close() + }, + }) - return bsvc + return bsvc + } } // Pinning creates new pinner which tells GC which blocks should be kept diff --git a/core/node/groups.go b/core/node/groups.go index 519cbb47d6e..12999f2586a 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -189,6 +189,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { cacheOpts := blockstore.DefaultCacheOpts() cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize + cacheOpts.HasTwoQueueCacheSize = cfg.Datastore.BlockKeyCacheSize if !bcfg.Permanent { cacheOpts.HasBloomFilterSize = 0 } @@ -201,7 +202,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { return fx.Options( fx.Provide(RepoConfig), fx.Provide(Datastore), - fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead)), + fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.BloomFilterSize == 0)), finalBstore, ) } @@ -332,7 +333,6 @@ func Offline(cfg *config.Config) fx.Option { // Core groups basic IPFS services var Core = fx.Options( - fx.Provide(BlockService), fx.Provide(Dag), fx.Provide(FetcherConfig), fx.Provide(PathResolverConfig), @@ -387,7 +387,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { Identity(cfg), IPNS, Networked(bcfg, cfg, userResourceOverrides), - + fx.Provide(BlockService(cfg)), Core, ) } diff --git a/core/node/storage.go b/core/node/storage.go index aedf0ee6a1d..1acce91cfcd 100644 --- a/core/node/storage.go +++ b/core/node/storage.go @@ -27,10 +27,14 @@ func Datastore(repo repo.Repo) datastore.Datastore { type BaseBlocks blockstore.Blockstore // BaseBlockstoreCtor creates cached blockstore backed by the provided datastore -func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { +func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { // hash security - bs = blockstore.NewBlockstore(repo.Datastore()) + var opts []blockstore.Option + if writeThrough { + opts = append(opts, blockstore.WriteThrough()) + } + bs = blockstore.NewBlockstore(repo.Datastore(), opts...) bs = &verifbs.VerifBS{Blockstore: bs} bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts) if err != nil { diff --git a/docs/changelogs/v0.33.md b/docs/changelogs/v0.33.md index 1adf16dd0bb..5a45694f3b6 100644 --- a/docs/changelogs/v0.33.md +++ b/docs/changelogs/v0.33.md @@ -31,6 +31,20 @@ If you depended on removed ones, please fill an issue to add them to the upstrea Onboarding files and directories with `ipfs add --to-files` now requires non-empty names. due to this, The `--to-files` and `--wrap` options are now mutually exclusive ([#10612](https://github.com/ipfs/kubo/issues/10612)). +#### New `Datastore.BlockKeyCacheSize` option and write-through blockstore/blockservice + +This option controls the size of a blockstore caching layer that records whether the blockstore has certain block and their sizes (not the contents). This was previously an internal option. It is set by default to 64KiB. +This caching layer can be disabled by setting it to `0`. This option is similar to the existing `BloomFilterSize`, which creates another bloom-filter-based wrapper on the blockstore. + +Additionally, setting `BloomFilterSize` to `0` will now trigger the use of "writethrough" blockservice and blockstores. Usually, the blockservice and the blockstore perform an `Has()` call on the underlying datastore before writing any block. If the datastore already has the block, they can skip the operation. In the "writethrough" mode, this does not happen. The reasoning is: + + * If the bloom filter is disabled, at least the first `Has()` call will hit the datastore. Some datastore incurr a large penalty for `Has()` (i.e. flatfs must open folders and do listing, S3 will need a call upstream...). + * Some datastore like Pebble already include their own bloom filter and caching layers we should not duplicate such layers on top. + * Some datastores are very fast to write but incurr in read-amplification. Calling `Has()` for every block reduces data ingest performance when batching multiple blocks. + * Calling `Has()` can cause eviction of blocks from read-caches when writing. + +For users trying Pebble as a datastore backend, they will be usually better off disabling all Kubo's key-caching and bloom filter, and thus enabling direct writes to Pebble. In general, we want to give users the freedom to play with these settings and find what works best for their workloads and backends. + #### 📦️ Dependency updates - update `boxo` to [v0.24.TODO](https://github.com/ipfs/boxo/releases/tag/v0.24.TODO) diff --git a/docs/config.md b/docs/config.md index 08a1cceedd8..e2be762de89 100644 --- a/docs/config.md +++ b/docs/config.md @@ -633,6 +633,19 @@ Default: `0` (disabled) Type: `integer` (non-negative, bytes) +### `Datastore.BlockKeyCacheSize` + +A number representing the maximum size in bytes of the blockstore's Two-Queue +cache, which caches block-cids and their block-sizes. Use `0` to disable. + +This cache, once primed, can greatly speed up operations like `ipfs repo stat` +as there is no need to read full blocks to know their sizes. Size should be +adjusted depending on the number of CIDs on disk. + +Default: `65536` (64KiB) + +Type: `integer` (non-negative, bytes) + ### `Datastore.Spec` Spec defines the structure of the ipfs datastore. It is a composable structure,