Skip to content

Commit

Permalink
Switch to mutex for canceled status
Browse files Browse the repository at this point in the history
  • Loading branch information
umpc committed Jul 6, 2017
1 parent 9f7a133 commit 7f4e595
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
35 changes: 19 additions & 16 deletions iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,39 @@ package sortedmap
import (
"errors"
"time"
"sync"
)

// IterChCloser allows records to be read through a channel that is returned by the Records method.
// IterChCloser values should be closed after use using the Close method.
type IterChCloser struct {
sync.RWMutex
ch chan Record
canceled chan struct{}
canceled bool
}

// Close cancels a channel-based iteration and causes the sending goroutine to exit.
// Close should be used after an IterChCloser is finished being read from.
func (iterCh *IterChCloser) Close() error {
select {
case iterCh.canceled <- struct{}{}:
default:
}
iterCh.Lock()
iterCh.canceled = true
iterCh.Unlock()

return nil
}

// Records returns nil if the IterChCloser has been closed.
// Otherwise, Record returns a channel that records can be read from.
func (iterCh *IterChCloser) Records() <-chan Record {
select {
case <-iterCh.canceled:
iterCh.canceled <- struct{}{}
iterCh.RLock()
canceled := iterCh.canceled
iterCh.RUnlock()

if canceled {
return nil
default:
return iterCh.ch
}

return iterCh.ch
}

// IterChParams contains configurable settings for CustomIterCh.
Expand Down Expand Up @@ -72,11 +75,12 @@ func (sm *SortedMap) recordFromIdx(i int) Record {
}

func (sm *SortedMap) sendRecord(iterCh IterChCloser, sendTimeout time.Duration, i int) bool {
select {
case <-iterCh.canceled:
iterCh.canceled <- struct{}{}
iterCh.RLock()
canceled := iterCh.canceled
iterCh.RUnlock()

if canceled {
return false
default:
}

if sendTimeout <= time.Duration(0) {
Expand All @@ -101,8 +105,7 @@ func (sm *SortedMap) iterCh(params IterChParams) (IterChCloser, error) {
}

iterCh := IterChCloser{
ch: make(chan Record, setBufSize(params.BufSize)),
canceled: make(chan struct{}, 1),
ch: make(chan Record, setBufSize(params.BufSize)),
}

go func(params IterChParams, iterCh IterChCloser) {
Expand Down
1 change: 0 additions & 1 deletion iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ func TestCancelCustomIterCh(t *testing.T) {
t.Fatal("Channel was not closed.")
}
}(ch)
ch.Close()
}()
}

Expand Down

0 comments on commit 7f4e595

Please sign in to comment.