Skip to content

Commit

Permalink
Removed write buffer bits
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod committed Nov 19, 2024
1 parent 7549050 commit 8cf3f11
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 35 deletions.
7 changes: 0 additions & 7 deletions pkg/storage/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,6 @@ func (m *Migrator) startMigration() {
Msg("Migration started")
}

if m.logger != nil {
m.logger.Debug().
Str("uuid", m.uuid.String()).
Uint64("size", m.sourceTracker.Size()).
Msg("Migration started")
}

// Tell the source to stop sync, and send alternateSources to the destination.
as := storage.SendSiloEvent(m.sourceTracker, "sync.stop", nil)
if len(as) == 1 {
Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/protocol/from_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (fp *FromProtocol) HandleEvent(cb func(*packets.Event)) error {
// Relay the event, wait, and then respond.
cb(ev)

_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeEventResponse(), true)
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeEventResponse())
if err != nil {
return err
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func (fp *FromProtocol) HandleHashes(cb func(map[uint][sha256.Size]byte)) error
// Relay the hashes, wait and then respond
cb(hashes)

_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeHashesResponse(), true)
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeHashesResponse())
if err != nil {
return err
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func (fp *FromProtocol) HandleReadAt() error {
Error: err,
Data: buff,
}
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeReadAtResponse(rar), false)
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeReadAtResponse(rar))
if err != nil {
errLock.Lock()
errValue = err
Expand Down Expand Up @@ -395,7 +395,7 @@ func (fp *FromProtocol) HandleWriteAt() error {
Error: nil,
Bytes: int(length),
}
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war), false)
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war))
if err != nil {
return err
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (fp *FromProtocol) HandleWriteAt() error {
if err == nil {
fp.markRangePresent(int(goffset), len(gdata))
}
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeWriteAtResponse(war), false)
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeWriteAtResponse(war))
if err != nil {
errLock.Lock()
errValue = err
Expand Down Expand Up @@ -468,7 +468,7 @@ func (fp *FromProtocol) HandleWriteAtWithMap(cb func(offset int64, data []byte,
Bytes: len(writeData),
Error: err,
}
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war), false)
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war))
if err != nil {
return err
}
Expand Down Expand Up @@ -556,7 +556,7 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error {
cb(blocks)

// Send a response / ack, to signify that the DirtyList has been actioned.
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeDirtyListResponse(), true)
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeDirtyListResponse())
if err != nil {
return err
}
Expand All @@ -565,7 +565,7 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error {

func (fp *FromProtocol) NeedAt(offset int64, length int32) error {
b := packets.EncodeNeedAt(offset, length)
_, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, true)
_, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b)
if err != nil {
atomic.AddUint64(&fp.metricSentNeedAt, 1)
}
Expand All @@ -574,7 +574,7 @@ func (fp *FromProtocol) NeedAt(offset int64, length int32) error {

func (fp *FromProtocol) DontNeedAt(offset int64, length int32) error {
b := packets.EncodeDontNeedAt(offset, length)
_, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, true)
_, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b)
if err != nil {
atomic.AddUint64(&fp.metricSentDontNeedAt, 1)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/mock_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewMockProtocol(ctx context.Context) *MockProtocol {
}
}

func (mp *MockProtocol) SendPacket(dev uint32, id uint32, data []byte, urgent bool) (uint32, error) {
func (mp *MockProtocol) SendPacket(dev uint32, id uint32, data []byte) (uint32, error) {
cmd := data[0]

// if id is ANY, pick one
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const IDPickAny = 0

type Protocol interface {
// Send a packet (Returns a transaction id)
SendPacket(dev uint32, id uint32, data []byte, urgent bool) (uint32, error)
SendPacket(dev uint32, id uint32, data []byte) (uint32, error)

// Wait for a response packet (Given specific transaction id)
WaitForPacket(dev uint32, id uint32) ([]byte, error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/protocol_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *RW) InitDev(dev uint32) {
}

// Send a packet
func (p *RW) SendPacket(dev uint32, id uint32, data []byte, urgent bool) (uint32, error) {
func (p *RW) SendPacket(dev uint32, id uint32, data []byte) (uint32, error) {
// If the context was cancelled, we should return that error
select {
case <-p.ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/protocol_rw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestProtocolRWSendAfterCancel(t *testing.T) {

// Now check that we can't send anything...

_, err = prDest.SendPacket(1, 0, []byte{1, 2, 3}, false)
_, err = prDest.SendPacket(1, 0, []byte{1, 2, 3})
assert.ErrorIs(t, err, context.Canceled)

}
4 changes: 2 additions & 2 deletions pkg/storage/protocol/test_protocol_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (p *TestProtocolBandwidth) expireBandwidth(since time.Duration) {
p.recentPackets = newRecentPackets
}

func (p *TestProtocolBandwidth) SendPacket(dev uint32, id uint32, data []byte, urgent bool) (uint32, error) {
return p.proto.SendPacket(dev, id, data, urgent)
func (p *TestProtocolBandwidth) SendPacket(dev uint32, id uint32, data []byte) (uint32, error) {
return p.proto.SendPacket(dev, id, data)
}

func (p *TestProtocolBandwidth) waitForBandwidth() {
Expand Down
24 changes: 12 additions & 12 deletions pkg/storage/protocol/to_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (i *ToProtocol) SendSiloEvent(eventType storage.EventType, eventData storag

func (i *ToProtocol) SendAltSources(s []packets.AlternateSource) error {
h := packets.EncodeAlternateSources(s)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, h, true)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, h)
if err == nil {
atomic.AddUint64(&i.metricSentAltSources, 1)
}
Expand All @@ -112,7 +112,7 @@ func (i *ToProtocol) SendAltSources(s []packets.AlternateSource) error {

func (i *ToProtocol) SendEvent(e *packets.Event) error {
b := packets.EncodeEvent(e)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b, true)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b)
if err != nil {
return err
}
Expand All @@ -130,7 +130,7 @@ func (i *ToProtocol) SendEvent(e *packets.Event) error {

func (i *ToProtocol) SendHashes(hashes map[uint][sha256.Size]byte) error {
h := packets.EncodeHashes(hashes)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, h, false)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, h)
if err != nil {
return err
}
Expand All @@ -154,7 +154,7 @@ func (i *ToProtocol) SendDevInfo(name string, blockSize uint32, schema string) e
Schema: schema,
}
b := packets.EncodeDevInfo(di)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, b, true)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, b)
if err != nil {
return err
}
Expand All @@ -166,7 +166,7 @@ func (i *ToProtocol) SendDevInfo(name string, blockSize uint32, schema string) e

func (i *ToProtocol) RemoveDev() error {
f := packets.EncodeRemoveDev()
_, err := i.protocol.SendPacket(i.dev, IDPickAny, f, true)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, f)
if err != nil {
return err
}
Expand All @@ -178,7 +178,7 @@ func (i *ToProtocol) RemoveDev() error {

func (i *ToProtocol) DirtyList(blockSize int, blocks []uint) error {
b := packets.EncodeDirtyList(blockSize, blocks)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b, true)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b)
if err != nil {
return err
}
Expand All @@ -198,7 +198,7 @@ func (i *ToProtocol) DirtyList(blockSize int, blocks []uint) error {

func (i *ToProtocol) ReadAt(buffer []byte, offset int64) (int, error) {
b := packets.EncodeReadAt(offset, int32(len(buffer)))
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b, true)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) {
hash := sha256.Sum256(buffer)
if bytes.Equal(hash[:], as.Hash[:]) {
data := packets.EncodeWriteAtHash(as.Offset, as.Length, as.Hash[:])
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, false)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data)
if err == nil {
atomic.AddUint64(&i.metricSentWriteAtHash, 1)
atomic.AddUint64(&i.metricSentWriteAtHashBytes, uint64(as.Length))
Expand All @@ -249,15 +249,15 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) {
if !dontSendData {
if i.CompressedWrites {
data := packets.EncodeWriteAtComp(offset, buffer)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, false)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data)
if err == nil {
atomic.AddUint64(&i.metricSentWriteAtComp, 1)
atomic.AddUint64(&i.metricSentWriteAtCompBytes, uint64(len(buffer)))
atomic.AddUint64(&i.metricSentWriteAtCompDataBytes, uint64(len(data)))
}
} else {
data := packets.EncodeWriteAt(offset, buffer)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, false)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data)
if err == nil {
atomic.AddUint64(&i.metricSentWriteAt, 1)
atomic.AddUint64(&i.metricSentWriteAtBytes, uint64(len(buffer)))
Expand Down Expand Up @@ -292,7 +292,7 @@ func (i *ToProtocol) WriteAtWithMap(buffer []byte, offset int64, idMap map[uint6
var id uint32
var err error
f := packets.EncodeWriteAtWithMap(offset, buffer, idMap)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, f, false)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, f)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (i *ToProtocol) WriteAtWithMap(buffer []byte, offset int64, idMap map[uint6

func (i *ToProtocol) RemoveFromMap(ids []uint64) error {
f := packets.EncodeRemoveFromMap(ids)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, f, true)
_, err := i.protocol.SendPacket(i.dev, IDPickAny, f)
if err == nil {
atomic.AddUint64(&i.metricSentRemoveFromMap, 1)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/to_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type sendPacketInfo struct {
data []byte
}

func (p *MockPro) SendPacket(dev uint32, id uint32, data []byte, urgent bool) (uint32, error) {
func (p *MockPro) SendPacket(dev uint32, id uint32, data []byte) (uint32, error) {
mockID := uint32(999)
p.sendPackets <- &sendPacketInfo{
dev: dev,
Expand Down

0 comments on commit 8cf3f11

Please sign in to comment.