Skip to content

Commit

Permalink
enforce wait for all goroutines in client tests
Browse files Browse the repository at this point in the history
client test code was opening connections in separate goroutines but not
waiting their termination. This is particularly dangerous if any calls
are made (asynchronously) to testing.*.Log* after the test function is
over. Not only the behavior is undefined, it was causing a data race in
go1.6 which is fixed in go1.7 by the following commit:

golang/go@5c83e65
  • Loading branch information
imkira committed Nov 29, 2016
1 parent f09251f commit 114eca9
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 60 deletions.
88 changes: 64 additions & 24 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,31 @@ import (
"bytes"
"io"
"reflect"
"sync"
"testing"
"time"
)

type server struct {
*testing.T
r reader // framer <- client
w writer // framer -> client
S io.ReadWriteCloser // Server IO
C io.ReadWriteCloser // Client IO
destructor sync.Once
r reader // framer <- client
w writer // framer -> client
S io.ReadWriteCloser // Server IO
C io.ReadWriteCloser // Client IO

// captured client frames
start connectionStartOk
tune connectionTuneOk
}

func (srv *server) close() {
srv.destructor.Do(func() {
srv.C.Close()
srv.S.Close()
})
}

func defaultConfig() Config {
return Config{SASL: []Authentication{&PlainAuth{"guest", "guest"}}, Vhost: "/"}
}
Expand All @@ -33,8 +42,8 @@ func newSession(t *testing.T) (io.ReadWriteCloser, *server) {
rs, wc := io.Pipe()
rc, ws := io.Pipe()

rws := &logIO{t, "server", pipe{rs, ws}}
rwc := &logIO{t, "client", pipe{rc, wc}}
rws := &logIO{t: t, prefix: "server", proxy: &pipe{r: rs, w: ws}}
rwc := &logIO{t: t, prefix: "client", proxy: &pipe{r: rc, w: wc}}

server := server{
T: t,
Expand Down Expand Up @@ -175,13 +184,16 @@ func (t *server) channelOpen(id int) {

func TestDefaultClientProperties(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
defer srv.close()
srv.connectionOpen()
rwc.Close()
}()

if c, err := Open(rwc, defaultConfig()); err != nil {
c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}

Expand All @@ -196,6 +208,7 @@ func TestDefaultClientProperties(t *testing.T) {

func TestCustomClientProperties(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

config := defaultConfig()
config.Properties = Table{
Expand All @@ -204,11 +217,13 @@ func TestCustomClientProperties(t *testing.T) {
}

go func() {
defer srv.close()
srv.connectionOpen()
rwc.Close()
}()

if c, err := Open(rwc, config); err != nil {
c, err := Open(rwc, config)
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}

Expand All @@ -223,27 +238,31 @@ func TestCustomClientProperties(t *testing.T) {

func TestOpen(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()
go func() {
defer srv.close()
srv.connectionOpen()
rwc.Close()
}()

if c, err := Open(rwc, defaultConfig()); err != nil {
c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
}

func TestChannelOpen(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

rwc.Close()
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
Expand All @@ -256,8 +275,10 @@ func TestChannelOpen(t *testing.T) {

func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
defer srv.close()
srv.expectAMQP()
srv.send(0, &connectionStart{
VersionMajor: 0,
Expand All @@ -268,51 +289,56 @@ func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != ErrSASL {
t.Fatalf("expected ErrSASL got: %+v on %+v", err, c)
}
}

func TestOpenFailedCredentials(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
// kill/timeout the connection indicating bad auth
defer srv.close()
srv.expectAMQP()
srv.connectionStart()
// Now kill/timeout the connection indicating bad auth
rwc.Close()
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != ErrCredentials {
t.Fatalf("expected ErrCredentials got: %+v on %+v", err, c)
}
}

func TestOpenFailedVhost(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
// kill/timeout the connection on bad Vhost
defer srv.close()
srv.expectAMQP()
srv.connectionStart()
srv.connectionTune()
srv.recv(0, &connectionOpen{})

// Now kill/timeout the connection on bad Vhost
rwc.Close()
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != ErrVhost {
t.Fatalf("expected ErrVhost got: %+v on %+v", err, c)
}
}

func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
rwc, srv := newSession(t)
defer rwc.Close()
defer srv.close()

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

Expand Down Expand Up @@ -343,6 +369,7 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
Expand Down Expand Up @@ -387,8 +414,10 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {

func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

Expand All @@ -400,6 +429,7 @@ func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
Expand All @@ -423,8 +453,10 @@ func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {

func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

Expand All @@ -433,6 +465,7 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
Expand Down Expand Up @@ -488,7 +521,7 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
// Should not panic when sending bodies split at different boundaries
func TestPublishBodySliceIssue74(t *testing.T) {
rwc, srv := newSession(t)
defer rwc.Close()
defer srv.close()

const frameSize = 100
const publishings = frameSize * 3
Expand All @@ -497,6 +530,7 @@ func TestPublishBodySliceIssue74(t *testing.T) {
base := make([]byte, publishings)

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

Expand All @@ -511,6 +545,7 @@ func TestPublishBodySliceIssue74(t *testing.T) {
cfg.FrameSize = frameSize

c, err := Open(rwc, cfg)
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
Expand All @@ -530,13 +565,14 @@ func TestPublishBodySliceIssue74(t *testing.T) {
// Should not panic when server and client have frame_size of 0
func TestPublishZeroFrameSizeIssue161(t *testing.T) {
rwc, srv := newSession(t)
defer rwc.Close()
defer srv.close()

const frameSize = 0
const publishings = 1
done := make(chan bool)

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

Expand All @@ -551,6 +587,7 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {
cfg.FrameSize = frameSize

c, err := Open(rwc, cfg)
defer c.Close()

// override the tuned framesize with a hard 0, as would happen when rabbit is configured with 0
c.Config.FrameSize = frameSize
Expand All @@ -573,7 +610,7 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {

func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
rwc, srv := newSession(t)
defer rwc.Close()
defer srv.close()

go func() {
srv.connectionOpen()
Expand All @@ -584,6 +621,7 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("couldn't create connection: %v (%s)", c, err)
}
Expand All @@ -604,18 +642,20 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {

func TestChannelCloseRace(t *testing.T) {
rwc, srv := newSession(t)
defer srv.close()

done := make(chan bool)

go func() {
defer srv.close()
srv.connectionOpen()
srv.channelOpen(1)

rwc.Close()
done <- true
}()

c, err := Open(rwc, defaultConfig())
defer c.Close()
if err != nil {
t.Fatalf("could not create connection: %v (%s)", c, err)
}
Expand Down
Loading

0 comments on commit 114eca9

Please sign in to comment.