Skip to content

Commit

Permalink
fix: Protect from concurrent calls to all instances of idempotent `Cl…
Browse files Browse the repository at this point in the history
…ose()`
  • Loading branch information
pojntfx committed Oct 22, 2023
1 parent 491533a commit 2c8a563
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 4 deletions.
8 changes: 7 additions & 1 deletion pkg/chunks/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ type Pusher struct {

workerWg sync.WaitGroup
workerSem chan struct{}
errs chan error

closeLock sync.Mutex

errs chan error
}

func NewPusher(
Expand Down Expand Up @@ -166,6 +169,9 @@ func (p *Pusher) Wait() error {
}

func (p *Pusher) Close() error {
p.closeLock.Lock()
defer p.closeLock.Unlock()

if _, err := p.Sync(); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/migration/file_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package migration
import (
"context"
"os"
"sync"

"github.com/pojntfx/go-nbd/pkg/backend"
"github.com/pojntfx/go-nbd/pkg/client"
Expand All @@ -26,6 +27,8 @@ type FileMigrator struct {
leecher *FileLeecher
released bool

closeLock sync.Mutex

errs chan error
}

Expand Down Expand Up @@ -253,6 +256,9 @@ func (s *FileMigrator) Leech(
}

func (s *FileMigrator) Close() error {
s.closeLock.Lock()
defer s.closeLock.Unlock()

if s.seeder != nil {
_ = s.seeder.Close()
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/migration/path_leecher.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ type PathLeecher struct {

pullerWg sync.WaitGroup
devWg *sync.WaitGroup
errs chan error

closeLock sync.Mutex

errs chan error
}

func NewPathLeecher(
Expand Down Expand Up @@ -341,6 +344,9 @@ func (l *PathLeecher) Release() (
}

func (l *PathLeecher) Close() error {
l.closeLock.Lock()
defer l.closeLock.Unlock()

if !l.released {
if hook := l.hooks.OnBeforeClose; hook != nil {
if err := hook(); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/migration/path_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package migration
import (
"context"
"errors"
"sync"

"github.com/pojntfx/go-nbd/pkg/backend"
"github.com/pojntfx/go-nbd/pkg/client"
Expand Down Expand Up @@ -49,6 +50,8 @@ type PathMigrator struct {
leecher *PathLeecher
released bool

closeLock sync.Mutex

errs chan error
}

Expand Down Expand Up @@ -277,6 +280,9 @@ func (s *PathMigrator) Leech(
}

func (s *PathMigrator) Close() error {
s.closeLock.Lock()
defer s.closeLock.Unlock()

if s.seeder != nil {
_ = s.seeder.Close()
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/migration/path_seeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type PathSeeder struct {

devicePath string

wg *sync.WaitGroup
wg *sync.WaitGroup

closeLock sync.Mutex

errs chan error
}

Expand Down Expand Up @@ -219,6 +222,9 @@ func (s *PathSeeder) Open() (string, int64, *services.SeederService, error) {
}

func (s *PathSeeder) Close() error {
s.closeLock.Lock()
defer s.closeLock.Unlock()

if hook := s.hooks.OnBeforeClose; hook != nil {
if err := hook(); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/migration/slice_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migration

import (
"context"
"sync"

"github.com/pojntfx/go-nbd/pkg/backend"
"github.com/pojntfx/go-nbd/pkg/client"
Expand All @@ -25,6 +26,8 @@ type SliceMigrator struct {
leecher *SliceLeecher
released bool

closeLock sync.Mutex

errs chan error
}

Expand Down Expand Up @@ -252,6 +255,9 @@ func (s *SliceMigrator) Leech(
}

func (s *SliceMigrator) Close() error {
s.closeLock.Lock()
defer s.closeLock.Unlock()

if s.seeder != nil {
_ = s.seeder.Close()
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/mount/file_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mount

import (
"os"
"sync"

"github.com/pojntfx/go-nbd/pkg/backend"
"github.com/pojntfx/go-nbd/pkg/client"
Expand All @@ -13,6 +14,8 @@ type DirectFileMount struct {

devicePath string
deviceFile *os.File

closeLock sync.Mutex
}

func NewDirectFileMount(
Expand Down Expand Up @@ -54,6 +57,9 @@ func (d *DirectFileMount) Open() (*os.File, error) {
}

func (d *DirectFileMount) Close() error {
d.closeLock.Lock()
defer d.closeLock.Unlock()

if d.deviceFile != nil {
_ = d.deviceFile.Close()

Expand Down
6 changes: 6 additions & 0 deletions pkg/mount/path_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mount
import (
"net"
"os"
"sync"
"syscall"

"github.com/pojntfx/go-nbd/pkg/backend"
Expand All @@ -24,6 +25,8 @@ type DirectPathMount struct {
cf *os.File
cc *net.UnixConn

closeLock sync.Mutex

errs chan error
}

Expand Down Expand Up @@ -126,6 +129,9 @@ func (d *DirectPathMount) Open() error {
}

func (d *DirectPathMount) Close() error {
d.closeLock.Lock()
defer d.closeLock.Unlock()

_ = client.Disconnect(d.f)

if d.cc != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/mount/path_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ type ManagedPathMount struct {
puller *chunks.Puller
dev *DirectPathMount

wg sync.WaitGroup
wg sync.WaitGroup

closeLock sync.Mutex

errs chan error
}

Expand Down Expand Up @@ -279,6 +282,9 @@ func (m *ManagedPathMount) Open() (string, int64, error) {
}

func (m *ManagedPathMount) Close() error {
m.closeLock.Lock()
defer m.closeLock.Unlock()

if m.syncer != nil {
_ = m.syncer.Sync()
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/mount/slice_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type DirectSliceMount struct {
mmapMount sync.Mutex

b backend.Backend

closeLock sync.Mutex
}

func NewDirectSliceMount(
Expand Down Expand Up @@ -79,6 +81,9 @@ func (d *DirectSliceMount) Open() ([]byte, error) {
}

func (d *DirectSliceMount) Close() error {
d.closeLock.Lock()
defer d.closeLock.Unlock()

d.mmapMount.Lock()
if d.slice != nil {
_ = d.slice.Unlock()
Expand Down

0 comments on commit 2c8a563

Please sign in to comment.