Skip to content

Commit

Permalink
Fixed up device.go
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Oct 24, 2024
1 parent 3f913af commit 822f0d1
Showing 1 changed file with 15 additions and 26 deletions.
41 changes: 15 additions & 26 deletions pkg/storage/device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/rs/zerolog/log"
)

const (
Expand Down Expand Up @@ -58,8 +57,6 @@ func NewDevices(ds []*config.DeviceSchema) (map[string]*Device, error) {
}

func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.ExposedStorage, error) {
device_schema := string(ds.Encode())
log.Info().Str("schema", device_schema).Msg("Setting up NewDevice from schema")

var prov storage.StorageProvider
var err error
Expand Down Expand Up @@ -174,7 +171,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose

// Optionally use a copy on write RO source...
if ds.ROSource != nil {
log.Info().Str("schema", device_schema).Msg("Setting up CopyOnWrite")

// Create the ROSource...
rodev, _, err := NewDevice(ds.ROSource)
Expand Down Expand Up @@ -218,7 +214,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose

// Optionally binlog this dev to a file
if ds.Binlog != "" {
log.Info().Str("schema", string(ds.Encode())).Msg("Setting up BinLog")
prov, err = modules.NewBinLog(prov, ds.Binlog)
if err != nil {
return nil, nil, err
Expand All @@ -229,7 +224,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
// NB You may well need to call ex.SetProvider if you wish to insert other things in the chain.
var ex storage.ExposedStorage
if ds.Expose {
log.Info().Str("schema", device_schema).Msg("Setting up Expose device")

ex = expose.NewExposedStorageNBDNL(prov, 8, 0, prov.Size(), expose.NBD_DEFAULT_BLOCK_SIZE, true)

Expand All @@ -242,7 +236,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose

// Optionally sync the device to S3
if ds.Sync != nil {
log.Info().Str("schema", device_schema).Msg("Setting up Sync")

s3dest, err := sources.NewS3StorageCreate(ds.Sync.Secure,
ds.Sync.Endpoint,
Expand Down Expand Up @@ -284,23 +277,23 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
ctx, cancelfn := context.WithCancel(context.TODO())

// Start doing the sync...
syncer := migrator.NewSyncer(ctx, &migrator.Sync_config{
Name: ds.Name,
Integrity: false,
Cancel_writes: true,
Dedupe_writes: true,
Tracker: sourceDirtyRemote,
Lockable: sourceStorage,
Destination: s3dest,
Orderer: orderer,
Dirty_check_period: check_period,
Dirty_block_getter: func() []uint {
syncer := migrator.NewSyncer(ctx, &migrator.SyncConfig{
Name: ds.Name,
Integrity: false,
CancelWrites: true,
DedupeWrites: true,
Tracker: sourceDirtyRemote,
Lockable: sourceStorage,
Destination: s3dest,
Orderer: orderer,
DirtyCheckPeriod: check_period,
DirtyBlockGetter: func() []uint {
return sourceDirtyRemote.GetDirtyBlocks(
max_age, ds.Sync.Config.Limit, ds.Sync.Config.BlockShift, ds.Sync.Config.MinChanged)
},
Block_size: bs,
Progress_handler: func(p *migrator.MigrationProgress) {},
Error_handler: func(b *storage.BlockInfo, err error) {},
BlockSize: bs,
ProgressHandler: func(p *migrator.MigrationProgress) {},
ErrorHandler: func(b *storage.BlockInfo, err error) {},
})

// The provider we return should feed into our sync here.
Expand All @@ -317,16 +310,14 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
sync_lock.Unlock()
return false
}
log.Info().Str("schema", device_schema).Msg("Starting sync")
sync_running = true
wg.Add(1)
sync_lock.Unlock()

// Sync happens here...
go func() {
// Do this in a goroutine, but make sure it's cancelled etc
status, err := syncer.Sync(false, true)
log.Info().Str("schema", device_schema).Err(err).Any("status", status).Msg("Sync finished")
_, _ = syncer.Sync(false, true)
wg.Done()
}()
return true
Expand All @@ -344,7 +335,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
sync_lock.Unlock()
return nil
}
log.Info().Str("schema", device_schema).Msg("Stopping sync")
cancelfn()
// WAIT HERE for the sync to finish
wg.Wait()
Expand All @@ -365,7 +355,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
alt_sources = append(alt_sources, as)
}

log.Info().Str("schema", device_schema).Int("sources", len(alt_sources)).Msg("Sync stopped with sources")
return alt_sources
})

Expand Down

0 comments on commit 822f0d1

Please sign in to comment.