Skip to content

Commit

Permalink
Added logging to waitingCache
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod committed Nov 19, 2024
1 parent 21a6d79 commit 306f127
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 29 deletions.
43 changes: 42 additions & 1 deletion pkg/storage/waitingcache/waiting_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package waitingcache
import (
"sync"

"github.com/google/uuid"
"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/util"
)
Expand All @@ -14,6 +16,8 @@ import (
*
*/
type WaitingCache struct {
logger types.RootLogger
uuid uuid.UUID
prov storage.Provider
local *Local
remote *Remote
Expand All @@ -26,8 +30,14 @@ type WaitingCache struct {
}

func NewWaitingCache(prov storage.Provider, blockSize int) (*Local, *Remote) {
return NewWaitingCacheWithLogger(prov, blockSize, nil)
}

func NewWaitingCacheWithLogger(prov storage.Provider, blockSize int, log types.RootLogger) (*Local, *Remote) {
numBlocks := (int(prov.Size()) + blockSize - 1) / blockSize
wc := &WaitingCache{
logger: log,
uuid: uuid.New(),
prov: prov,
blockSize: blockSize,
size: prov.Size(),
Expand Down Expand Up @@ -55,6 +65,17 @@ func (i *WaitingCache) waitForBlocks(bStart uint, bEnd uint, lockCB func(b uint)
}

func (i *WaitingCache) waitForBlock(b uint, lockCB func(b uint)) {
if i.logger != nil {
i.logger.Trace().
Str("uuid", i.uuid.String()).
Uint("block", b).
Msg("waitForBlock")
defer i.logger.Trace().
Str("uuid", i.uuid.String()).
Uint("block", b).
Msg("waitForBlock complete")
}

// If we have it locally, return.
if i.local.available.BitSet(int(b)) {
return
Expand All @@ -80,8 +101,14 @@ func (i *WaitingCache) waitForBlock(b uint, lockCB func(b uint)) {
rwl.RLock()
}

// TODO: Fix the logic here a bit
func (i *WaitingCache) markAvailableBlockLocal(b uint) {
if i.logger != nil {
i.logger.Trace().
Str("uuid", i.uuid.String()).
Uint("block", b).
Msg("markAvailableLocalBlock")
}

i.lockersLock.Lock()
avail := i.local.available.BitSet(int(b))
rwl, ok := i.lockers[b]
Expand All @@ -108,6 +135,13 @@ func (i *WaitingCache) markAvailableRemoteBlocks(bStart uint, bEnd uint) {
}

func (i *WaitingCache) markAvailableRemoteBlock(b uint) {
if i.logger != nil {
i.logger.Trace().
Str("uuid", i.uuid.String()).
Uint("block", b).
Msg("markAvailableRemoteBlock")
}

i.lockersLock.Lock()
avail := i.remote.available.BitSet(int(b))
rwl, ok := i.lockers[b]
Expand All @@ -126,6 +160,13 @@ func (i *WaitingCache) markAvailableRemoteBlock(b uint) {
}

func (i *WaitingCache) markUnavailableRemoteBlock(b uint) {
if i.logger != nil {
i.logger.Trace().
Str("uuid", i.uuid.String()).
Uint("block", b).
Msg("markUnavailableRemoteBlock")
}

i.lockersLock.Lock()
avail := i.remote.available.BitSet(int(b))
if avail {
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/waitingcache/waiting_cache_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ func (wcl *Local) SendSiloEvent(eventType storage.EventType, eventData storage.E
}

func (wcl *Local) ReadAt(buffer []byte, offset int64) (int, error) {
if wcl.wc.logger != nil {
wcl.wc.logger.Trace().
Str("uuid", wcl.wc.uuid.String()).
Int64("offset", offset).
Int("length", len(buffer)).
Msg("local ReadAt")
defer wcl.wc.logger.Trace().
Str("uuid", wcl.wc.uuid.String()).
Int64("offset", offset).
Int("length", len(buffer)).
Msg("local ReadAt complete")
}

end := uint64(offset + int64(len(buffer)))
if end > wcl.wc.size {
end = wcl.wc.size
Expand All @@ -38,6 +51,19 @@ func (wcl *Local) ReadAt(buffer []byte, offset int64) (int, error) {
}

func (wcl *Local) WriteAt(buffer []byte, offset int64) (int, error) {
if wcl.wc.logger != nil {
wcl.wc.logger.Trace().
Str("uuid", wcl.wc.uuid.String()).
Int64("offset", offset).
Int("length", len(buffer)).
Msg("local WriteAt")
defer wcl.wc.logger.Trace().
Str("uuid", wcl.wc.uuid.String()).
Int64("offset", offset).
Int("length", len(buffer)).
Msg("local WriteAt complete")
}

end := uint64(offset + int64(len(buffer)))
if end > wcl.wc.size {
end = wcl.wc.size
Expand Down
69 changes: 41 additions & 28 deletions pkg/storage/waitingcache/waiting_cache_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,99 @@ type Remote struct {
}

// Relay events to embedded StorageProvider
func (wcl *Remote) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData {
data := wcl.ProviderWithEvents.SendSiloEvent(eventType, eventData)
return append(data, storage.SendSiloEvent(wcl.wc.prov, eventType, eventData)...)
func (wcr *Remote) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData {
data := wcr.ProviderWithEvents.SendSiloEvent(eventType, eventData)
return append(data, storage.SendSiloEvent(wcr.wc.prov, eventType, eventData)...)
}

func (wcl *Remote) ReadAt(_ []byte, _ int64) (int, error) {
func (wcr *Remote) ReadAt(_ []byte, _ int64) (int, error) {
// Remote reads are unsupported at the moment.
return 0, io.EOF
}

func (wcl *Remote) WriteAt(buffer []byte, offset int64) (int, error) {
func (wcr *Remote) WriteAt(buffer []byte, offset int64) (int, error) {
if wcr.wc.logger != nil {
wcr.wc.logger.Trace().
Str("uuid", wcr.wc.uuid.String()).
Int64("offset", offset).
Int("length", len(buffer)).
Msg("remote WriteAt")
defer wcr.wc.logger.Trace().
Str("uuid", wcr.wc.uuid.String()).
Int64("offset", offset).
Int("length", len(buffer)).
Msg("remote WriteAt complete")
}

end := uint64(offset + int64(len(buffer)))
if end > wcl.wc.size {
end = wcl.wc.size
if end > wcr.wc.size {
end = wcr.wc.size
}

bStart := uint(offset / int64(wcl.wc.blockSize))
bEnd := uint((end-1)/uint64(wcl.wc.blockSize)) + 1
bStart := uint(offset / int64(wcr.wc.blockSize))
bEnd := uint((end-1)/uint64(wcr.wc.blockSize)) + 1

align := 0
// If the first block is incomplete, we won't mark it.
if offset > (int64(bStart) * int64(wcl.wc.blockSize)) {
if offset > (int64(bStart) * int64(wcr.wc.blockSize)) {
bStart++
align = int(offset - (int64(bStart) * int64(wcl.wc.blockSize)))
align = int(offset - (int64(bStart) * int64(wcr.wc.blockSize)))
}
// If the last block is incomplete, we won't mark it. *UNLESS* It's the last block in the storage
if (end % uint64(wcl.wc.blockSize)) > 0 {
if uint64(offset)+uint64(len(buffer)) < wcl.wc.size {
if (end % uint64(wcr.wc.blockSize)) > 0 {
if uint64(offset)+uint64(len(buffer)) < wcr.wc.size {
bEnd--
}
}

var err error
var n int

if wcl.wc.allowLocalWrites {
if wcr.wc.allowLocalWrites {
// Check if we have local data that needs merging (From local writes)
avail := wcl.wc.local.available.Collect(bStart, bEnd)
avail := wcr.wc.local.available.Collect(bStart, bEnd)

if len(avail) != 0 {
pbuffer := make([]byte, len(buffer))
_, err = wcl.wc.prov.ReadAt(pbuffer, offset)
_, err = wcr.wc.prov.ReadAt(pbuffer, offset)
if err == nil {
for _, b := range avail {
s := align + (int(b-bStart) * wcl.wc.blockSize)
s := align + (int(b-bStart) * wcr.wc.blockSize)
// Merge the data in. We know these are complete blocks.
// NB This does modify the callers buffer.
copy(buffer[s:s+wcl.wc.blockSize], pbuffer[s:s+wcl.wc.blockSize])
copy(buffer[s:s+wcr.wc.blockSize], pbuffer[s:s+wcr.wc.blockSize])
}
}
}
}

// Perform the WriteAt
if err == nil {
n, err = wcl.wc.prov.WriteAt(buffer, offset)
n, err = wcr.wc.prov.WriteAt(buffer, offset)
}

// Signal that we have blocks available from remote
if err == nil {
if bEnd > bStart {
wcl.wc.markAvailableRemoteBlocks(bStart, bEnd)
wcr.wc.markAvailableRemoteBlocks(bStart, bEnd)
}
}

return n, err
}

func (wcl *Remote) Flush() error {
return wcl.wc.prov.Flush()
func (wcr *Remote) Flush() error {
return wcr.wc.prov.Flush()
}

func (wcl *Remote) Size() uint64 {
return wcl.wc.prov.Size()
func (wcr *Remote) Size() uint64 {
return wcr.wc.prov.Size()
}

func (wcl *Remote) Close() error {
return wcl.wc.prov.Close()
func (wcr *Remote) Close() error {
return wcr.wc.prov.Close()
}

func (wcl *Remote) CancelWrites(offset int64, length int64) {
wcl.wc.prov.CancelWrites(offset, length)
func (wcr *Remote) CancelWrites(offset int64, length int64) {
wcr.wc.prov.CancelWrites(offset, length)
}

0 comments on commit 306f127

Please sign in to comment.