diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bf0697479..8bde550f92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Changelog for NeoFS Node ## [Unreleased] ### Added +- More effective FSTree writer for HDDs, new configuration options for it (#2814) ### Fixed diff --git a/cmd/neofs-lens/internal/storage/root.go b/cmd/neofs-lens/internal/storage/root.go index b4a715d211..8101cc99b2 100644 --- a/cmd/neofs-lens/internal/storage/root.go +++ b/cmd/neofs-lens/internal/storage/root.go @@ -9,7 +9,6 @@ import ( engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" - peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/storage" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" @@ -119,15 +118,18 @@ func openEngine(cmd *cobra.Command) *engine.StorageEngine { sCfg.Typ = storagesCfg[i].Type() sCfg.Path = storagesCfg[i].Path() sCfg.Perm = storagesCfg[i].Perm() + sCfg.FlushInterval = storagesCfg[i].FlushInterval() switch storagesCfg[i].Type() { case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.Depth = sub.Depth() sCfg.NoSync = sub.NoSync() + sCfg.CombinedCountLimit = sub.CombinedCountLimit() + sCfg.CombinedSizeLimit = sub.CombinedSizeLimit() + sCfg.CombinedSizeThreshold = sub.CombinedSizeThreshold() case peapod.Type: - peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i])) - sCfg.FlushInterval = peapodCfg.FlushInterval() + // Nothing peapod-specific, but it should work. default: return fmt.Errorf("can't initiate storage. invalid storage type: %s", storagesCfg[i].Type()) } @@ -193,7 +195,11 @@ func openEngine(cmd *cobra.Command) *engine.StorageEngine { fstree.WithPath(sRead.Path), fstree.WithPerm(sRead.Perm), fstree.WithDepth(sRead.Depth), - fstree.WithNoSync(sRead.NoSync)), + fstree.WithNoSync(sRead.NoSync), + fstree.WithCombinedCountLimit(sRead.CombinedCountLimit), + fstree.WithCombinedSizeLimit(sRead.CombinedSizeLimit), + fstree.WithCombinedSizeThreshold(sRead.CombinedSizeThreshold), + fstree.WithCombinedWriteInterval(sRead.FlushInterval)), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, diff --git a/cmd/neofs-lens/internal/storage/sanity.go b/cmd/neofs-lens/internal/storage/sanity.go index 53c0b63246..3209fecbd7 100644 --- a/cmd/neofs-lens/internal/storage/sanity.go +++ b/cmd/neofs-lens/internal/storage/sanity.go @@ -13,7 +13,6 @@ import ( engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" - peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" commonb "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" @@ -77,8 +76,7 @@ func sanityCheck(cmd *cobra.Command, _ []string) { default: return fmt.Errorf("unsupported sub-storage type '%s'", subCfg.Type()) case peapod.Type: - peapodCfg := peapodconfig.From((*config.Config)(subCfg)) - sh.p = peapod.New(subCfg.Path(), subCfg.Perm(), peapodCfg.FlushInterval()) + sh.p = peapod.New(subCfg.Path(), subCfg.Perm(), subCfg.FlushInterval()) var compressCfg compression.Config err := compressCfg.Init() diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index e6de08ec90..d603482cc1 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -23,7 +23,6 @@ import ( engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" - peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" morphconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/morph" @@ -224,15 +223,18 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { sCfg.Typ = storagesCfg[i].Type() sCfg.Path = storagesCfg[i].Path() sCfg.Perm = storagesCfg[i].Perm() + sCfg.FlushInterval = storagesCfg[i].FlushInterval() switch storagesCfg[i].Type() { case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.Depth = sub.Depth() sCfg.NoSync = sub.NoSync() + sCfg.CombinedCountLimit = sub.CombinedCountLimit() + sCfg.CombinedSizeLimit = sub.CombinedSizeLimit() + sCfg.CombinedSizeThreshold = sub.CombinedSizeThreshold() case peapod.Type: - peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i])) - sCfg.FlushInterval = peapodCfg.FlushInterval() + // No specific configs, but it's a valid storage type. default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index 160f455d4f..de3b66a6aa 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -9,7 +9,6 @@ import ( engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" - peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" @@ -87,11 +86,10 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 102400, sc.SmallSizeLimit()) require.Equal(t, 2, len(ss)) - ppd := peapodconfig.From((*config.Config)(ss[0])) require.Equal(t, "tmp/0/blob/peapod.db", ss[0].Path()) require.EqualValues(t, 0644, ss[0].Perm()) require.EqualValues(t, peapod.Type, ss[0].Type()) - require.EqualValues(t, 10*time.Millisecond, ppd.FlushInterval()) + require.EqualValues(t, 10*time.Millisecond, ss[0].FlushInterval()) require.Equal(t, "tmp/0/blob", ss[1].Path()) require.EqualValues(t, 0644, ss[1].Perm()) @@ -131,11 +129,10 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 102400, sc.SmallSizeLimit()) require.Equal(t, 2, len(ss)) - ppd := peapodconfig.From((*config.Config)(ss[0])) require.Equal(t, "tmp/1/blob/peapod.db", ss[0].Path()) require.EqualValues(t, 0644, ss[0].Perm()) require.EqualValues(t, peapod.Type, ss[0].Type()) - require.EqualValues(t, 30*time.Millisecond, ppd.FlushInterval()) + require.EqualValues(t, 30*time.Millisecond, ss[0].FlushInterval()) require.Equal(t, "tmp/1/blob", ss[1].Path()) require.EqualValues(t, 0644, ss[1].Perm()) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go b/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go index 6595e9375d..43c2de6d82 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go @@ -1,16 +1,27 @@ package fstree import ( + "math" + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/spf13/cast" ) // Config is a wrapper over the config section // which provides access to FSTree configurations. type Config config.Config -// DepthDefault is a default shallow dir depth. -const DepthDefault = 4 +const ( + // DepthDefault is the default shallow dir depth. + DepthDefault = 4 + // CombinedCountLimitDefault is the default for the maximum number of objects to write into a single file. + CombinedCountLimitDefault = 128 + // CombinedSizeLimitDefault is the default for the maximum size of the combined object file. + CombinedSizeLimitDefault = 8 * 1024 * 1024 + // CombinedSizeThresholdDefault is the default for the minimal size of the object that won't be combined with others for writes. + CombinedSizeThresholdDefault = 128 * 1024 +) // From wraps config section into Config. func From(c *config.Config) *Config { @@ -45,3 +56,41 @@ func (x *Config) Depth() uint64 { func (x *Config) NoSync() bool { return config.BoolSafe((*config.Config)(x), "no_sync") } + +// CombinedCountLimit returns the value of "combined_count_limit" config parameter. +// +// Returns [CombinedCountLimitDefault] if the value is missing or not a positive integer. +func (x *Config) CombinedCountLimit() int { + var v = (*config.Config)(x).Value("combined_count_limit") + if v == nil { + return CombinedCountLimitDefault + } + + i, err := cast.ToIntE(v) + if err != nil { + return CombinedCountLimitDefault + } + return i +} + +// CombinedSizeLimit returns the value of "combined_size_limit" config parameter. +// +// Returns [CombinedSizeLimitDefault] if the value is missing, equal to 0 or not a proper size specification. +func (x *Config) CombinedSizeLimit() int { + var s = config.SizeInBytesSafe((*config.Config)(x), "combined_size_limit") + if s == 0 || s > math.MaxInt { + return CombinedSizeLimitDefault + } + return int(s) +} + +// CombinedSizeThreshold returns the value of "combined_size_threshold" config parameter. +// +// Returns [CombinedSizeThresholdDefault] if the value is missing, equal to 0 or not a proper size specification. +func (x *Config) CombinedSizeThreshold() int { + var s = config.SizeInBytesSafe((*config.Config)(x), "combined_size_threshold") + if s == 0 || s > math.MaxInt { + return CombinedSizeThresholdDefault + } + return int(s) +} diff --git a/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go deleted file mode 100644 index cd01d21ca9..0000000000 --- a/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package peapodconfig - -import ( - "time" - - "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" -) - -// Config is a wrapper over the config section -// which provides access to Peapod configurations. -type Config config.Config - -// Various Peapod config defaults. -const ( - // DefaultFlushInterval is a default time interval between Peapod's batch writes - // to disk. - DefaultFlushInterval = 10 * time.Millisecond -) - -// From wraps config section into Config. -func From(c *config.Config) *Config { - return (*Config)(c) -} - -// FlushInterval returns the value of "flush_interval" config parameter. -// -// Returns DefaultFlushInterval if the value is not a positive duration. -func (x *Config) FlushInterval() time.Duration { - d := config.DurationSafe((*config.Config)(x), "flush_interval") - if d > 0 { - return d - } - return DefaultFlushInterval -} diff --git a/cmd/neofs-node/config/engine/shard/blobstor/storage/config.go b/cmd/neofs-node/config/engine/shard/blobstor/storage/config.go index 4a7d879e0d..32b78686a1 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/storage/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/storage/config.go @@ -2,14 +2,22 @@ package storage import ( "io/fs" + "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" ) type Config config.Config -// PermDefault are default permission bits for BlobStor data. -const PermDefault = 0o640 +// Various config defaults. +const ( + // PermDefault are default permission bits for BlobStor data. + PermDefault = 0o640 + + // DefaultFlushInterval is the default time interval between Peapod's batch writes + // to disk. + DefaultFlushInterval = 10 * time.Millisecond +) func From(x *config.Config) *Config { return (*Config)(x) @@ -53,3 +61,14 @@ func (x *Config) Perm() fs.FileMode { return fs.FileMode(p) } + +// FlushInterval returns the value of "flush_interval" config parameter. +// +// Returns DefaultFlushInterval if the value is not a positive duration. +func (x *Config) FlushInterval() time.Duration { + d := config.DurationSafe((*config.Config)(x), "flush_interval") + if d > 0 { + return d + } + return DefaultFlushInterval +} diff --git a/cmd/neofs-node/storage.go b/cmd/neofs-node/storage.go index f613281f58..13714fbfbb 100644 --- a/cmd/neofs-node/storage.go +++ b/cmd/neofs-node/storage.go @@ -153,7 +153,11 @@ func (c *cfg) shardOpts() []shardOptsWithID { fstree.WithPath(sRead.Path), fstree.WithPerm(sRead.Perm), fstree.WithDepth(sRead.Depth), - fstree.WithNoSync(sRead.NoSync)), + fstree.WithNoSync(sRead.NoSync), + fstree.WithCombinedCountLimit(sRead.CombinedCountLimit), + fstree.WithCombinedSizeLimit(sRead.CombinedSizeLimit), + fstree.WithCombinedSizeThreshold(sRead.CombinedSizeThreshold), + fstree.WithCombinedWriteInterval(sRead.FlushInterval)), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, diff --git a/cmd/neofs-node/storage/config.go b/cmd/neofs-node/storage/config.go index b4b69d7d49..8ed85196ac 100644 --- a/cmd/neofs-node/storage/config.go +++ b/cmd/neofs-node/storage/config.go @@ -53,16 +53,17 @@ type ShardCfg struct { } type SubStorageCfg struct { // common for all storages - Typ string - Path string - Perm fs.FileMode + Typ string + Path string + Perm fs.FileMode + FlushInterval time.Duration // tree-specific (FS) - Depth uint64 - NoSync bool - - // Peapod-specific - FlushInterval time.Duration + Depth uint64 + NoSync bool + CombinedCountLimit int + CombinedSizeLimit int + CombinedSizeThreshold int } // ID returns persistent id of a shard. It is different from the ID used in runtime diff --git a/config/example/node.json b/config/example/node.json index d07a7fd8e6..a0f3e786d5 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -199,7 +199,11 @@ "path": "tmp/1/blob", "no_sync": true, "perm": "0644", - "depth": 5 + "depth": 5, + "flush_interval": "20ms", + "combined_count_limit": 64, + "combined_size_limit": "16M", + "combined_size_threshold": "512K" } ], "pilorama": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 9dad016afc..5b3b2d9ac9 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -196,6 +196,10 @@ storage: - type: fstree path: tmp/1/blob # blobstor path no_sync: true + flush_interval: 20ms # time interval between combined file writes to disk (defaults to 10ms) + combined_count_limit: 64 # number of small objects to write into a single file (defaults to 128) + combined_size_limit: 16M # limit for the multi-object file size (defaults to 8M) + combined_size_threshold: 512K # threshold for combined object writing (defaults to 128K) pilorama: path: tmp/1/blob/pilorama.db diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 51c2c0b2cf..d39ebeee55 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -180,15 +180,10 @@ Currently only 2 types are supported: `fstree` and `peapod`. blobstor: - type: peapod path: /path/to/peapod.db - depth: 1 - width: 4 - type: fstree path: /path/to/blobstor perm: 0644 - size: 4194304 depth: 1 - width: 4 - opened_cache_capacity: 50 ``` #### Common options for sub-storages @@ -196,20 +191,24 @@ blobstor: |-------------------------------------|-----------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `path` | `string` | | Path to the root of the blobstor. | | `perm` | file mode | `0640` | Default permission for created files and directories. | +| `flush_interval` | `duration` | `10ms` | Time interval between batch writes to disk. | #### `fstree` type options -| Parameter | Type | Default value | Description | -|---------------------|-----------|---------------|-------------------------------------------------------| -| `path` | `string` | | Path to the root of the blobstor. | -| `perm` | file mode | `0640` | Default permission for created files and directories. | -| `depth` | `int` | `4` | File-system tree depth. | +| Parameter | Type | Default value | Description | +|---------------------------|-----------|---------------|------------------------------------------------------------------------------------------------------------------------------| +| `path` | `string` | | Path to the root of the blobstor. | +| `perm` | file mode | `0640` | Default permission for created files and directories. | +| `depth` | `int` | `4` | File-system tree depth. | +| `no_sync` | `bool` | `false` | Disable write synchronization, makes writes faster, but can lead to data loss. | +| `combined_count_limit` | `int` | `128` | Maximum number of objects to write into a single file, 0 or 1 disables combined writing (disabling is recommended for SSDs). | +| `combined_size_limit` | `size` | `8M` | Maximum size of a multi-object file. | +| `combined_size_threshold` | `size` | `128K` | Minimum size of object that won't be combined with others when writing to disk. | #### `peapod` type options | Parameter | Type | Default value | Description | |---------------------|-----------|---------------|-------------------------------------------------------| | `path` | `string` | | Path to the Peapod database file. | | `perm` | file mode | `0640` | Default permission for created files and directories. | -| `flush_interval` | `duration`| `10ms` | Time interval between batch writes to disk. | ### `gc` subsection diff --git a/pkg/local_object_storage/blobstor/bench_test.go b/pkg/local_object_storage/blobstor/bench_test.go index a8ce636dea..2c76c0d844 100644 --- a/pkg/local_object_storage/blobstor/bench_test.go +++ b/pkg/local_object_storage/blobstor/bench_test.go @@ -11,6 +11,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) @@ -94,3 +97,86 @@ func BenchmarkPut(b *testing.B) { }) } } + +func BenchmarkGet(b *testing.B) { + const nObjects = 10000 + + for _, tc := range []struct { + objSize uint64 + nThreads int + }{ + {1, 1}, + {1, 20}, + {1, 100}, + {1 << 10, 1}, + {1 << 10, 20}, + {1 << 10, 100}, + {100 << 10, 1}, + {100 << 10, 20}, + {100 << 10, 100}, + } { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + for name, creat := range map[string]func(testing.TB) common.Storage{ + "peapod": newTestPeapod, + "fstree": newTestFSTree, + } { + b.Run(name, func(b *testing.B) { + var objs = make([]oid.Address, 0, nObjects) + + ptt := creat(b) + require.NoError(b, ptt.Open(false)) + require.NoError(b, ptt.Init()) + b.Cleanup(func() { _ = ptt.Close() }) + + obj := object.New() + data := make([]byte, tc.objSize) + rand.Read(data) + obj.SetID(oid.ID{1, 2, 3}) + obj.SetContainerID(cid.ID{1, 2, 3}) + obj.SetPayload(data) + + prm := common.PutPrm{ + RawData: obj.Marshal(), + } + + var ach = make(chan oid.Address) + for i := 0; i < 100; i++ { + go func() { + for j := 0; j < nObjects/100; j++ { + prm := prm + + prm.Address = oidtest.Address() + + _, err := ptt.Put(prm) + require.NoError(b, err) + ach <- prm.Address + } + }() + } + for i := 0; i < nObjects; i++ { + a := <-ach + objs = append(objs, a) + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + var wg sync.WaitGroup + + for i := 0; i < tc.nThreads; i++ { + wg.Add(1) + go func(ind int) { + defer wg.Done() + + var prm = common.GetPrm{Address: objs[nObjects/tc.nThreads*ind+n%(nObjects/tc.nThreads)]} + _, err := ptt.Get(prm) + require.NoError(b, err) + }(i) + } + + wg.Wait() + } + }) + } + }) + } +} diff --git a/pkg/local_object_storage/blobstor/fstree/control.go b/pkg/local_object_storage/blobstor/fstree/control.go index dedf334752..b6fc940042 100644 --- a/pkg/local_object_storage/blobstor/fstree/control.go +++ b/pkg/local_object_storage/blobstor/fstree/control.go @@ -19,13 +19,15 @@ func (t *FSTree) Init() error { return fmt.Errorf("mkdir all for %q: %w", t.RootPath, err) } if !t.readOnly { - f := newSpecificWriteData(t.RootPath, t.Permissions, t.noSync) - if f != nil { - t.writeData = f + var w = newSpecificWriter(t) + if w != nil { + t.writer = w } } return nil } // Close implements common.Storage. -func (*FSTree) Close() error { return nil } +func (t *FSTree) Close() error { + return t.writer.finalize() +} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 5c24756ede..1593af9511 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -1,7 +1,9 @@ package fstree import ( + "bytes" "crypto/sha256" + "encoding/binary" "errors" "fmt" "io" @@ -10,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" @@ -28,10 +31,15 @@ type FSTree struct { *compression.Config Depth uint64 DirNameLen int - writeData func(string, []byte) error + writer writer noSync bool readOnly bool + + combinedCountLimit int + combinedSizeLimit int + combinedSizeThreshold int + combinedWriteInterval time.Duration } // Info groups the information about file storage. @@ -43,11 +51,23 @@ type Info struct { RootPath string } +// writer is an internal FS writing interface. +type writer interface { + writeData(oid.ID, string, []byte) error + finalize() error +} + const ( // DirNameLen is how many bytes is used to group keys into directories. DirNameLen = 1 // in bytes // MaxDepth is maximum depth of nested directories. MaxDepth = (sha256.Size - 1) / DirNameLen + + // combinedPrefix is the prefix that Protobuf message can't start with, + // it reads as "field number 15 of type 7", but there is no type 7 in + // the system (and we usually don't have 15 fields). ZSTD magic is also + // different. + combinedPrefix = 0x7f ) var _ common.Storage = (*FSTree)(nil) @@ -61,11 +81,16 @@ func New(opts ...Option) *FSTree { Config: nil, Depth: 4, DirNameLen: DirNameLen, + + combinedCountLimit: 128, + combinedSizeLimit: 8 * 1024 * 1024, + combinedSizeThreshold: 128 * 1024, + combinedWriteInterval: 10 * time.Millisecond, } for i := range opts { opts[i](f) } - f.writeData = newGenericWriteData(f.Permissions, f.noSync) + f.writer = newGenericWriter(f.Permissions, f.noSync) return f } @@ -140,18 +165,13 @@ func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) if prm.LazyHandler != nil { err = prm.LazyHandler(*addr, func() ([]byte, error) { - data, err := os.ReadFile(filepath.Join(curPath...)) - if err != nil && errors.Is(err, fs.ErrNotExist) { - return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) - } - - return data, err + return getRawObjectBytes(addr.Object(), filepath.Join(curPath...)) }) } else { var data []byte p := filepath.Join(curPath...) - data, err = os.ReadFile(p) - if err != nil && errors.Is(err, fs.ErrNotExist) { + data, err = getRawObjectBytes(addr.Object(), p) + if err != nil && errors.Is(err, apistatus.ObjectNotFound{}) { continue } if err == nil { @@ -266,7 +286,7 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) { if !prm.DontCompress { prm.RawData = t.Compress(prm.RawData) } - err := t.writeData(p, prm.RawData) + err := t.writer.writeData(prm.Address.Object(), p, prm.RawData) if err != nil { return common.PutRes{}, fmt.Errorf("write object data into file %q: %w", p, err) } @@ -275,25 +295,14 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) { // Get returns an object from the storage by address. func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) { - p := t.treePath(prm.Address) - - if _, err := os.Stat(p); errors.Is(err, fs.ErrNotExist) { - return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{}) - } - - data, err := os.ReadFile(p) + data, err := t.getObjBytes(prm.Address) if err != nil { - return common.GetRes{}, fmt.Errorf("read file %q: %w", p, err) - } - - data, err = t.Decompress(data) - if err != nil { - return common.GetRes{}, fmt.Errorf("decompress file data %q: %w", p, err) + return common.GetRes{}, err } obj := objectSDK.New() if err := obj.Unmarshal(data); err != nil { - return common.GetRes{}, fmt.Errorf("decode object from file %q: %w", p, err) + return common.GetRes{}, fmt.Errorf("decode object: %w", err) } return common.GetRes{Object: obj, RawData: data}, nil @@ -303,47 +312,104 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) { // canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object // is missing. func (t *FSTree) GetBytes(addr oid.Address) ([]byte, error) { + return t.getObjBytes(addr) +} + +// getObjBytes extracts object bytes from the storage by address. +func (t *FSTree) getObjBytes(addr oid.Address) ([]byte, error) { p := t.treePath(addr) + data, err := getRawObjectBytes(addr.Object(), p) + if err != nil { + return nil, err + } + data, err = t.Decompress(data) + if err != nil { + return nil, fmt.Errorf("decompress file data %q: %w", p, err) + } + return data, nil +} +// getRawObjectBytes extracts raw object bytes from the storage by path. No +// decompression is performed. +func getRawObjectBytes(id oid.ID, p string) ([]byte, error) { f, err := os.Open(p) if err != nil { if errors.Is(err, fs.ErrNotExist) { return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } - return nil, fmt.Errorf("open object file %q: %w", p, err) - } - - fi, err := f.Stat() - if err != nil { - return nil, fmt.Errorf("stat object file %q: %w", p, err) - } - sz := fi.Size() - if sz > math.MaxInt { - return nil, fmt.Errorf("too big object file %d > %d", sz, math.MaxInt) + return nil, fmt.Errorf("read file %q: %w", p, err) } - if sz == 0 { - return nil, nil - } - - b := make([]byte, sz) - _, err = io.ReadFull(f, b) + defer f.Close() + data, err := extractCombinedObject(id, f) if err != nil { - if errors.Is(err, io.EOF) { - err = io.ErrUnexpectedEOF + if errors.Is(err, fs.ErrNotExist) { + return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } - return nil, fmt.Errorf("read all %d bytes from object file %q: %w", sz, p, err) + return nil, fmt.Errorf("extract object from %q: %w", p, err) } + return data, nil +} - if !t.IsCompressed(b) { - return b, nil - } +func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) { + const ( + prefixSize = 1 + idSize = sha256.Size + lengthSize = 4 - dec, err := t.DecompressForce(b) - if err != nil { - return nil, fmt.Errorf("decompress object file data %q: %w", p, err) - } + idOff = prefixSize + lengthOff = idOff + idSize + dataOff = lengthOff + lengthSize + ) + + var ( + comBuf [dataOff]byte + data []byte + isCombined bool + ) - return dec, nil + for { + n, err := io.ReadFull(f, comBuf[:]) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + if !isCombined { + return comBuf[:n], nil + } + return nil, fs.ErrNotExist + } + return nil, err + } + if comBuf[0] != combinedPrefix { + st, err := f.Stat() + if err != nil { + return nil, err + } + sz := st.Size() + if sz > math.MaxInt { + return nil, errors.New("too large file") + } + data = make([]byte, int(sz)) + copy(data, comBuf[:]) + _, err = io.ReadFull(f, data[len(comBuf):]) + if err != nil { + return nil, err + } + return data, nil + } + isCombined = true + var l = binary.BigEndian.Uint32(comBuf[lengthOff:dataOff]) + if bytes.Equal(comBuf[idOff:lengthOff], id[:]) { + data = make([]byte, l) + _, err = io.ReadFull(f, data) + if err != nil { + return nil, err + } + return data, nil + } + _, err = f.Seek(int64(l), 1) + if err != nil { + return nil, err + } + } } // GetRange implements common.Storage. diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go index 06bd1de587..8c85b1c671 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_generic.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) type genericWriter struct { @@ -16,7 +17,7 @@ type genericWriter struct { flags int } -func newGenericWriteData(perm fs.FileMode, noSync bool) func(string, []byte) error { +func newGenericWriter(perm fs.FileMode, noSync bool) writer { flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL if !noSync { flags |= os.O_SYNC @@ -25,10 +26,14 @@ func newGenericWriteData(perm fs.FileMode, noSync bool) func(string, []byte) err perm: perm, flags: flags, } - return w.writeData + return w } -func (w *genericWriter) writeData(p string, data []byte) error { +func (w *genericWriter) finalize() error { + return nil +} + +func (w *genericWriter) writeData(_ oid.ID, p string, data []byte) error { // Here is a situation: // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""} // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"} diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go index 98ae286c56..26acc76e54 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go @@ -3,45 +3,216 @@ package fstree import ( + "encoding/binary" "errors" "fmt" - "io/fs" "strconv" + "sync" + "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "golang.org/x/sys/unix" ) type linuxWriter struct { - root string - perm uint32 - flags int + root string + perm uint32 + flags int + bFlags int + noSync bool + + combinedCountLimit int + combinedSizeLimit int + combinedSizeThreshold int + combinedWriteInterval time.Duration + + batchLock sync.Mutex + batch *syncBatch +} + +type syncBatch struct { + lock sync.Mutex + fd int + procname string + cnt int + size int + noSync bool + timer *time.Timer + ready chan struct{} + err error } -func newSpecificWriteData(root string, perm fs.FileMode, noSync bool) func(string, []byte) error { +func newSpecificWriter(t *FSTree) writer { flags := unix.O_WRONLY | unix.O_TMPFILE | unix.O_CLOEXEC - if !noSync { + bFlags := flags + if !t.noSync { flags |= unix.O_DSYNC } - fd, err := unix.Open(root, flags, uint32(perm)) + fd, err := unix.Open(t.RootPath, flags, uint32(t.Permissions)) if err != nil { return nil // Which means that OS-specific writeData can't be created and FSTree should use the generic one. } _ = unix.Close(fd) // Don't care about error. w := &linuxWriter{ - root: root, - perm: uint32(perm), - flags: flags, + root: t.RootPath, + perm: uint32(t.Permissions), + flags: flags, + bFlags: bFlags, + noSync: t.noSync, + + combinedCountLimit: t.combinedCountLimit, + combinedSizeLimit: t.combinedSizeLimit, + combinedSizeThreshold: t.combinedSizeThreshold, + combinedWriteInterval: t.combinedWriteInterval, + } + return w +} + +func (w *linuxWriter) newSyncBatch() (*syncBatch, error) { + fd, err := unix.Open(w.root, w.bFlags, w.perm) + if err != nil { + return nil, err + } + sb := &syncBatch{ + fd: fd, + procname: "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10), + ready: make(chan struct{}), + noSync: w.noSync, + } + sb.lock.Lock() + sb.timer = time.AfterFunc(w.combinedWriteInterval, sb.sync) + return sb, nil +} + +func (b *syncBatch) sync() { + b.lock.Lock() + defer b.lock.Unlock() + + select { + case <-b.ready: + return + default: } - return w.writeData + b.intSync() } -func (w *linuxWriter) writeData(p string, data []byte) error { - err := w.writeFile(p, data) - if errors.Is(err, unix.ENOSPC) { - return common.ErrNoSpace +func (b *syncBatch) intSync() { + var err error + + if b.err == nil && !b.noSync { + err = unix.Fdatasync(b.fd) + if err != nil { + b.err = err + } + } + + err = unix.Close(b.fd) + if b.err == nil && err != nil { + b.err = err + } + close(b.ready) + _ = b.timer.Stop() // True is stopped, but false is "AfterFunc already running". +} + +func (b *syncBatch) wait() error { + <-b.ready + return b.err +} + +func (b *syncBatch) write(id oid.ID, p string, data []byte) error { + var ( + err error + pref [1 + len(id) + 4]byte + ) + pref[0] = combinedPrefix + copy(pref[1:], id[:]) + binary.BigEndian.PutUint32(pref[1+len(id):], uint32(len(data))) + + n, err := unix.Writev(b.fd, [][]byte{pref[:], data}) + if err != nil { + b.err = err + b.intSync() + return err + } + if n != len(pref)+len(data) { + b.err = errors.New("incomplete write") + b.intSync() + return b.err + } + b.size += n + b.cnt++ + err = unix.Linkat(unix.AT_FDCWD, b.procname, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) + if err != nil { + if errors.Is(err, unix.EEXIST) { + // https://github.com/nspcc-dev/neofs-node/issues/2563 + return nil + } + b.err = err + b.intSync() + return b.err + } + return nil +} + +func (w *linuxWriter) finalize() error { + w.batchLock.Lock() + defer w.batchLock.Unlock() + if w.batch != nil { + w.batch.sync() + w.batch = nil + } + return nil +} + +func (w *linuxWriter) writeData(id oid.ID, p string, data []byte) error { + var err error + if len(data) > w.combinedSizeThreshold || w.combinedCountLimit < 2 { + err = w.writeFile(p, data) + } else { + err = w.writeCombinedFile(id, p, data) + } + if err != nil { + if errors.Is(err, unix.ENOSPC) { + return common.ErrNoSpace + } + return err + } + return nil +} + +func (w *linuxWriter) writeCombinedFile(id oid.ID, p string, data []byte) error { + var err error + var sb *syncBatch + + w.batchLock.Lock() + if w.batch == nil { + w.batch, err = w.newSyncBatch() + sb = w.batch + } else { + sb = w.batch + sb.lock.Lock() + select { + case <-sb.ready: + sb.lock.Unlock() + w.batch, err = w.newSyncBatch() + sb = w.batch + default: + } + } + if err != nil { + return err + } + err = sb.write(id, p, data) + if err == nil && sb.cnt >= w.combinedCountLimit || sb.size >= w.combinedSizeLimit { + sb.intSync() + } + sb.lock.Unlock() + w.batchLock.Unlock() + if err != nil { + return err } - return err + return sb.wait() } func (w *linuxWriter) writeFile(p string, data []byte) error { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_write_specific.go b/pkg/local_object_storage/blobstor/fstree/fstree_write_specific.go index 3950d94ab9..fc0ebf840f 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_write_specific.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_write_specific.go @@ -2,10 +2,6 @@ package fstree -import ( - "io/fs" -) - -func newSpecificWriteData(_ string, _ fs.FileMode, _ bool) func(string, []byte) error { +func newSpecificWriter(_ *FSTree) writer { return nil } diff --git a/pkg/local_object_storage/blobstor/fstree/option.go b/pkg/local_object_storage/blobstor/fstree/option.go index 07e5474445..c16f7b190f 100644 --- a/pkg/local_object_storage/blobstor/fstree/option.go +++ b/pkg/local_object_storage/blobstor/fstree/option.go @@ -2,6 +2,7 @@ package fstree import ( "io/fs" + "time" ) type Option func(*FSTree) @@ -35,3 +36,27 @@ func WithNoSync(noSync bool) Option { f.noSync = noSync } } + +func WithCombinedCountLimit(limit int) Option { + return func(f *FSTree) { + f.combinedCountLimit = limit + } +} + +func WithCombinedSizeLimit(size int) Option { + return func(f *FSTree) { + f.combinedSizeLimit = size + } +} + +func WithCombinedSizeThreshold(size int) Option { + return func(f *FSTree) { + f.combinedSizeThreshold = size + } +} + +func WithCombinedWriteInterval(t time.Duration) Option { + return func(f *FSTree) { + f.combinedWriteInterval = t + } +} diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index d95b76f6ca..e2be2f8bf8 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -67,6 +67,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts } opts := append([]shard.Option{ + shard.WithID(shard.NewIDFromBytes([]byte("testShard"))), shard.WithLogger(zap.L()), shard.WithBlobStorOptions(bsOpts...), shard.WithMetaBaseOptions(