Skip to content

Commit

Permalink
S3Sync now cancels pending writes. Added metrics to S3Storage for cur…
Browse files Browse the repository at this point in the history
…rent writes/reads

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod committed Nov 20, 2024
1 parent 3df1230 commit d394eeb
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
15 changes: 12 additions & 3 deletions pkg/storage/device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.RootLogger,
return true
}

stopSync := func(_ storage.EventType, _ storage.EventData) storage.EventReturnData {
stopSyncing := func(cancelWrites bool) storage.EventReturnData {
if log != nil {
log.Debug().Str("name", ds.Name).Msg("sync.stop called")
}
Expand All @@ -430,6 +430,11 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.RootLogger,
return nil
}
cancelfn()

if cancelWrites {
s3dest.CancelWrites(0, int64(s3dest.Size()))
}

// WAIT HERE for the sync to finish
wg.Wait()
syncRunning = false
Expand All @@ -456,6 +461,10 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.RootLogger,
return altSources
}

stopSync := func(_ storage.EventType, _ storage.EventData) storage.EventReturnData {
return stopSyncing(false)
}

// If the storage gets a "sync.stop", we should cancel the sync, and return the safe blocks
storage.AddSiloEventNotification(prov, "sync.stop", stopSync)

Expand All @@ -481,8 +490,8 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.RootLogger,

hooks := modules.NewHooks(prov)
hooks.PostClose = func(err error) error {
// We should stop any sync here...
stopSync("sync.stop", nil)
// We should stop any sync here, but ask it to cancel any existing writes if possible.
stopSyncing(true)
return err
}
prov = hooks
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Metrics struct {
s3BlocksRBytes *prometheus.GaugeVec
s3BlocksW *prometheus.GaugeVec
s3BlocksWBytes *prometheus.GaugeVec
s3ActiveReads *prometheus.GaugeVec
s3ActiveWrites *prometheus.GaugeVec

// toProtocol
toProtocolSentEvents *prometheus.GaugeVec
Expand Down Expand Up @@ -285,6 +287,10 @@ func New(reg prometheus.Registerer) *Metrics {
Namespace: promNamespace, Subsystem: promSubS3, Name: "blocks_r", Help: "Blocks r"}, []string{"device"}),
s3BlocksRBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: promNamespace, Subsystem: promSubS3, Name: "blocks_r_bytes", Help: "Blocks r bytes"}, []string{"device"}),
s3ActiveReads: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: promNamespace, Subsystem: promSubS3, Name: "active_reads", Help: "Active reads"}, []string{"device"}),
s3ActiveWrites: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: promNamespace, Subsystem: promSubS3, Name: "active_writes", Help: "Active writes"}, []string{"device"}),

// DirtyTracker
dirtyTrackerBlockSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -375,7 +381,7 @@ func New(reg prometheus.Registerer) *Metrics {

reg.MustRegister(met.protocolPacketsSent, met.protocolDataSent, met.protocolPacketsRecv, met.protocolDataRecv, met.protocolWrites, met.protocolWriteErrors, met.protocolWaitingForId)

reg.MustRegister(met.s3BlocksR, met.s3BlocksRBytes, met.s3BlocksW, met.s3BlocksWBytes)
reg.MustRegister(met.s3BlocksR, met.s3BlocksRBytes, met.s3BlocksW, met.s3BlocksWBytes, met.s3ActiveReads, met.s3ActiveWrites)

reg.MustRegister(met.toProtocolSentEvents, met.toProtocolSentAltSources, met.toProtocolSentHashes, met.toProtocolSentDevInfo,
met.toProtocolSentDirtyList, met.toProtocolSentReadAt, met.toProtocolSentWriteAtHash, met.toProtocolSentWriteAtHashBytes,
Expand Down Expand Up @@ -564,6 +570,8 @@ func (m *Metrics) AddS3Storage(name string, s3 *sources.S3Storage) {
m.s3BlocksWBytes.WithLabelValues(name).Set(float64(met.BlocksWBytes))
m.s3BlocksR.WithLabelValues(name).Set(float64(met.BlocksRCount))
m.s3BlocksRBytes.WithLabelValues(name).Set(float64(met.BlocksRBytes))
m.s3ActiveReads.WithLabelValues(name).Set(float64(met.ActiveReads))
m.s3ActiveWrites.WithLabelValues(name).Set(float64(met.ActiveWrites))
})

}
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/sources/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type S3Storage struct {
metricsBlocksRDataBytes uint64
metricsBlocksRBytes uint64
metricsBlocksRTimeNS uint64
metricsActiveWrites int64
metricsActiveReads int64
}

func NewS3Storage(secure bool, endpoint string,
Expand Down Expand Up @@ -161,15 +163,18 @@ func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) {
}
i.lockers[off/int64(i.blockSize)].RLock()
ctime := time.Now()
atomic.AddInt64(&i.metricsActiveReads, 1)
obj, err := i.client.GetObject(context.TODO(), i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{})
i.lockers[off/int64(i.blockSize)].RUnlock()
if err != nil {
atomic.AddInt64(&i.metricsActiveReads, -1)
if err.Error() == errNoSuchKey.Error() {
return len(buff), nil
}
return 0, err
}
n, err := obj.Read(buff)
atomic.AddInt64(&i.metricsActiveReads, -1)
dtime := time.Since(ctime)
if err == io.EOF {
atomic.AddUint64(&i.metricsBlocksRCount, 1)
Expand Down Expand Up @@ -246,15 +251,18 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) {
ctx := context.TODO()
i.lockers[off/int64(i.blockSize)].RLock()
ctime := time.Now()
atomic.AddInt64(&i.metricsActiveReads, 1)
obj, err := i.client.GetObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{})
i.lockers[off/int64(i.blockSize)].RUnlock()
if err != nil {
atomic.AddInt64(&i.metricsActiveReads, -1)
if err.Error() == errNoSuchKey.Error() {
return len(buff), nil
}
return 0, err
}
n, err := obj.Read(buff)
atomic.AddInt64(&i.metricsActiveReads, -1)
dtime := time.Since(ctime)
if err == io.EOF {
atomic.AddUint64(&i.metricsBlocksWPreRCount, 1)
Expand All @@ -280,9 +288,11 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) {
i.setContext(int(block), cancelFn)

ctime := time.Now()
atomic.AddInt64(&i.metricsActiveWrites, 1)
obj, err := i.client.PutObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off),
bytes.NewReader(buff), int64(i.blockSize),
minio.PutObjectOptions{})
atomic.AddInt64(&i.metricsActiveWrites, -1)
dtime := time.Since(ctime)

i.setContext(int(block), nil)
Expand Down Expand Up @@ -389,6 +399,8 @@ type S3Metrics struct {
BlocksRBytes uint64
BlocksRDataBytes uint64
BlocksRTime time.Duration
ActiveReads uint64
ActiveWrites uint64
}

func (i *S3Metrics) String() string {
Expand All @@ -412,5 +424,7 @@ func (i *S3Storage) Metrics() *S3Metrics {
BlocksRBytes: atomic.LoadUint64(&i.metricsBlocksRBytes),
BlocksRDataBytes: atomic.LoadUint64(&i.metricsBlocksRDataBytes),
BlocksRTime: time.Duration(atomic.LoadUint64(&i.metricsBlocksRTimeNS)),
ActiveReads: uint64(atomic.LoadInt64(&i.metricsActiveReads)),
ActiveWrites: uint64(atomic.LoadInt64(&i.metricsActiveWrites)),
}
}

0 comments on commit d394eeb

Please sign in to comment.