Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jamesmoore/arch 224 silo support device groups #66

Merged
merged 37 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ba283fb
Start on impl of dev groups
jimmyaxod Dec 9, 2024
e7eef75
Device group creation
jimmyaxod Dec 9, 2024
a3dd4a2
First test for deviceGroup
jimmyaxod Dec 9, 2024
69f5b08
Added dg.SendDevInfo and test
jimmyaxod Dec 9, 2024
2964c7e
test cleanup
jimmyaxod Dec 9, 2024
6c82e53
Added first try at dg.MigrateAll
jimmyaxod Dec 9, 2024
a51a851
DeviceGroup migrate and test working
jimmyaxod Dec 9, 2024
4b3db5c
Lint fixes
jimmyaxod Dec 9, 2024
4404d98
Added logging for dg.MigrateAll
jimmyaxod Dec 9, 2024
86c3641
Device concurrency is now proportional to size and allocated from a d…
jimmyaxod Dec 9, 2024
cde82d3
First stab at migrateDirty
jimmyaxod Dec 10, 2024
5151ba5
Added hooks for dg.MigrateDirty phase
jimmyaxod Dec 10, 2024
2509799
refactored dg tests common setup
jimmyaxod Dec 10, 2024
47d230a
Drastic simplification of cmd/serve using devicegroup
jimmyaxod Dec 11, 2024
cb4f0fe
Added new packet for deviceGroup
jimmyaxod Dec 11, 2024
33cff26
devicegroup devInfo change passes test, but /cmd/connect will be brok…
jimmyaxod Dec 11, 2024
ec9aaf4
Started on dg.NewFromProtocol (migrate_from)
jimmyaxod Dec 11, 2024
f4e79ae
First deviceGroup migrate test working
jimmyaxod Dec 11, 2024
4d6eb17
Split dg.FromProtocol in two, so we can start using devices before mi…
jimmyaxod Dec 11, 2024
2d508f7
Added waitingCache bits into dg from
jimmyaxod Dec 11, 2024
36294e1
Added simple authority transfer to dg
jimmyaxod Dec 11, 2024
9ab57ce
Improved progressHandler for dg
jimmyaxod Dec 11, 2024
b35952a
Added README for dg usage
jimmyaxod Dec 12, 2024
1eb538c
Simplified cmd/connect to use new dg api
jimmyaxod Dec 12, 2024
dcfbbd0
Updates to schema encode/decode
jimmyaxod Dec 13, 2024
031cf74
dg tests pass. Order issue fixed
jimmyaxod Dec 13, 2024
4179e40
cmd serve/connect working
jimmyaxod Dec 13, 2024
7d4d6df
Some minor tweaks to readme
jimmyaxod Dec 13, 2024
9d4c436
Silly typos
jimmyaxod Dec 13, 2024
02d94bf
Added event compat and exposed things from dg
jimmyaxod Dec 14, 2024
44a40f9
Refined MigrateDirty hooks for drafter integrate
jimmyaxod Dec 16, 2024
70c4e9f
Updated README, and progress now map by name
jimmyaxod Dec 16, 2024
65f19cd
Small possible race in nbd_dispatch
jimmyaxod Dec 17, 2024
d06050a
Better shutdown
jimmyaxod Dec 17, 2024
7b4ae69
Even better nbd dispatch shutdown
jimmyaxod Dec 17, 2024
c62e0bc
lint
jimmyaxod Dec 17, 2024
e13b79a
Moved customDataHandler. Updated WaitForCompletion to honour context
jimmyaxod Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
408 changes: 20 additions & 388 deletions cmd/connect.go

Large diffs are not rendered by default.

466 changes: 73 additions & 393 deletions cmd/serve.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions pkg/storage/config/silo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"errors"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -123,6 +124,25 @@ func (ds *DeviceSchema) Encode() []byte {
return f.Bytes()
}

func (ds *DeviceSchema) EncodeAsBlock() []byte {
f := hclwrite.NewEmptyFile()
block := gohcl.EncodeAsBlock(ds, "device")
f.Body().AppendBlock(block)
return f.Bytes()
}

func DecodeDeviceFromBlock(schema string) (*DeviceSchema, error) {
sf := &SiloSchema{}
err := sf.Decode([]byte(schema))
if err != nil {
return nil, err
}
if len(sf.Device) != 1 {
return nil, errors.New("more than one device in schema")
}
return sf.Device[0], nil
}

func (ds *DeviceSchema) Decode(schema string) error {
file, diag := hclsyntax.ParseConfig([]byte(schema), "", hcl.Pos{Line: 1, Column: 1})
if diag.HasErrors() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/storage/config/silo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,31 @@ func TestSiloConfig(t *testing.T) {
assert.NoError(t, err)
// TODO: Check data is as expected
}

func TestSiloConfigBlock(t *testing.T) {

schema := `device Disk0 {
size = "1G"
expose = true
system = "memory"
}

device Disk1 {
size = "2M"
system = "memory"
}
`

s := new(SiloSchema)
err := s.Decode([]byte(schema))
assert.NoError(t, err)

block0 := s.Device[0].EncodeAsBlock()

ds := &SiloSchema{}
err = ds.Decode(block0)
assert.NoError(t, err)

// Make sure it used the label
assert.Equal(t, ds.Device[0].Name, s.Device[0].Name)
}
75 changes: 75 additions & 0 deletions pkg/storage/devicegroup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Device Group

The `DeviceGroup` combines some number of Silo devices into a single unit, which can then be migrated to another Silo instance.
All internal concerns such as volatilityMonitor, waitingCache, as well as the new S3 assist, are now hidden from the consumer.

## Creation

There are two methods to create a `DeviceGroup`.

### NewFromSchema

This takes in an array of Silo device configs, and creates the devices. If `expose==true` then a corresponding NBD device will be created and attached.

### NewFromProtocol

This takes in a `protocol` and creates the devices as they are received from a sender.

## Usage (Sending devices)

Devices in a `DeviceGroup` are sent together, which allows Silo to optimize all aspects of the transfer.

// Create a device group from schema
dg, err := devicegroup.NewFromSchema(devices, log, siloMetrics)

// Start a migration
err := dg.StartMigrationTo(protocol)

// Migrate the data with max total concurrency 100
err = dg.MigrateAll(100, pHandler)

// Migrate any dirty blocks
// hooks gives some control over the dirty loop
err = dg.MigrateDirty(hooks)

// Send completion events for all devices
err = dg.Completed()

// Close everything
dg.CloseAll()

Within the `MigrateDirty` there are a number of hooks we can use to control things. MigrateDirty will return once all devices have no more dirty data. You can of course then call MigrateDirty again eg for continuous sync.

type MigrateDirtyHooks struct {
PreGetDirty func(name string) error
PostGetDirty func(name string, blocks []uint) (bool, error)
PostMigrateDirty func(name string, blocks []uint) (bool, error)
Completed func(name string)
}


There is also support for sending global custom data. This would typically be done either in `pHandler` (The progress handler), or in one of the `MigrateDirty` hooks.

pHandler := func(ps []*migrator.MigrationProgress) {
// Do some test here to see if enough data migrated

// If so, send a custom Authority Transfer event.
dg.SendCustomData(authorityTransferPacket)
}

## Usage (Receiving devices)

// Create a DeviceGroup from protocol
// tweak func allows us to modify the schema, eg pathnames
dg, err = NewFromProtocol(ctx, protocol, tweak, nil, nil)

// Handle any custom data events
// For example resume the VM here.
go dg.HandleCustomData(func(data []byte) {
// We got sent some custom data!
})

// Wait for migration completion
dg.WaitForCompletion()

Once a `DeviceGroup` is has been created and migration is completed, you can then send the devices somewhere else with `StartMigration(protocol)`.
139 changes: 139 additions & 0 deletions pkg/storage/devicegroup/device_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package devicegroup

import (
"context"
"errors"
"sync"
"time"

"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/blocks"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/dirtytracker"
"github.com/loopholelabs/silo/pkg/storage/metrics"
"github.com/loopholelabs/silo/pkg/storage/migrator"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/volatilitymonitor"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
)

const volatilityExpiry = 30 * time.Minute
const defaultBlockSize = 1024 * 1024

var errNotSetup = errors.New("toProtocol not setup")

type DeviceGroup struct {
log types.Logger
met metrics.SiloMetrics
ctx context.Context
devices []*DeviceInformation
controlProtocol protocol.Protocol
incomingDevicesCh chan bool
progressLock sync.Mutex
progress map[string]*migrator.MigrationProgress
}

type DeviceInformation struct {
Size uint64
BlockSize uint64
NumBlocks int
Schema *config.DeviceSchema
Prov storage.Provider
Storage storage.LockableProvider
Exp storage.ExposedStorage
Volatility *volatilitymonitor.VolatilityMonitor
DirtyLocal *dirtytracker.Local
DirtyRemote *dirtytracker.Remote
To *protocol.ToProtocol
Orderer *blocks.PriorityBlockOrder
Migrator *migrator.Migrator
migrationError chan error
WaitingCacheLocal *waitingcache.Local
WaitingCacheRemote *waitingcache.Remote
EventHandler func(e *packets.Event)
}

func (dg *DeviceGroup) GetDeviceSchema() []*config.DeviceSchema {
s := make([]*config.DeviceSchema, 0)
for _, di := range dg.devices {
s = append(s, di.Schema)
}
return s
}

func (dg *DeviceGroup) GetAllNames() []string {
names := make([]string, 0)
for _, di := range dg.devices {
names = append(names, di.Schema.Name)
}
return names
}

func (dg *DeviceGroup) GetDeviceInformationByName(name string) *DeviceInformation {
for _, di := range dg.devices {
if di.Schema.Name == name {
return di
}
}
return nil
}

func (dg *DeviceGroup) GetExposedDeviceByName(name string) storage.ExposedStorage {
for _, di := range dg.devices {
if di.Schema.Name == name && di.Exp != nil {
return di.Exp
}
}
return nil
}

func (dg *DeviceGroup) GetProviderByName(name string) storage.Provider {
for _, di := range dg.devices {
if di.Schema.Name == name {
return di.Prov
}
}
return nil
}

func (dg *DeviceGroup) GetBlockSizeByName(name string) int {
for _, di := range dg.devices {
if di.Schema.Name == name {
return int(di.BlockSize)
}
}
return -1
}

func (dg *DeviceGroup) CloseAll() error {
if dg.log != nil {
dg.log.Debug().Int("devices", len(dg.devices)).Msg("close device group")
}

var e error
for _, d := range dg.devices {
// Unlock the storage so nothing blocks here...
// If we don't unlock there may be pending nbd writes that can't be completed.
d.Storage.Unlock()

err := d.Prov.Close()
if err != nil {
if dg.log != nil {
dg.log.Error().Err(err).Msg("error closing device group storage provider")
}
e = errors.Join(e, err)
}
if d.Exp != nil {
err = d.Exp.Shutdown()
if err != nil {
if dg.log != nil {
dg.log.Error().Err(err).Msg("error closing device group exposed storage")
}
e = errors.Join(e, err)
}
}
}
return e
}
Loading
Loading