diff --git a/jetstream/publish.go b/jetstream/publish.go index d08cdf1cb..70e219ac4 100644 --- a/jetstream/publish.go +++ b/jetstream/publish.go @@ -81,6 +81,7 @@ type ( err error errCh chan error doneCh chan *PubAck + reply string } jetStreamClient struct { @@ -280,17 +281,17 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut } var id string + var reply string // register new paf if not retrying if paf == nil { var err error - m.Reply, err = js.newAsyncReply() - defer func() { m.Reply = "" }() + reply, err = js.newAsyncReply() if err != nil { return nil, fmt.Errorf("nats: error creating async reply handler: %s", err) } - id = m.Reply[js.replyPrefixLen:] - paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait} + id = reply[js.replyPrefixLen:] + paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait, reply: reply} numPending, maxPending := js.registerPAF(id, paf) if maxPending > 0 && numPending > maxPending { @@ -303,10 +304,17 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut } } else { // when retrying, get the ID from existing reply subject - id = m.Reply[js.replyPrefixLen:] + reply = paf.reply + id = reply[js.replyPrefixLen:] } - if err := js.conn.PublishMsg(m); err != nil { + pubMsg := &nats.Msg{ + Subject: m.Subject, + Reply: reply, + Data: m.Data, + Header: m.Header, + } + if err := js.conn.PublishMsg(pubMsg); err != nil { js.clearPAF(id) return nil, err } @@ -370,6 +378,31 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) { return } + closeStc := func() { + // Check on anyone stalled and waiting. + if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.maxpa { + close(js.publisher.stallCh) + js.publisher.stallCh = nil + } + } + + closeDchFn := func() func() { + var dch chan struct{} + // Check on anyone one waiting on done status. + if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 { + dch = js.publisher.doneCh + js.publisher.doneCh = nil + } + // Return function to close done channel which + // should be deferred so that error is processed and + // can be checked. + return func() { + if dch != nil { + close(dch) + } + } + } + doErr := func(err error) { paf.err = err if paf.errCh != nil { @@ -378,7 +411,6 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) { cb := js.publisher.asyncPublisherOpts.aecb js.publisher.Unlock() if cb != nil { - paf.msg.Reply = "" cb(js, paf.msg, err) } } @@ -387,7 +419,6 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) { if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { if paf.retries < paf.maxRetries { paf.retries++ - paf.msg.Reply = m.Subject time.AfterFunc(paf.retryWait, func() { js.publisher.Lock() paf := js.getPAF(id) @@ -408,25 +439,16 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) { return } delete(js.publisher.acks, id) + closeStc() + defer closeDchFn()() doErr(ErrNoStreamResponse) return } // Remove delete(js.publisher.acks, id) - - // Check on anyone stalled and waiting. - if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.asyncPublisherOpts.maxpa { - close(js.publisher.stallCh) - js.publisher.stallCh = nil - } - // Check on anyone waiting on done status. - if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 { - dch := js.publisher.doneCh - js.publisher.doneCh = nil - // Defer here so error is processed and can be checked. - defer close(dch) - } + closeStc() + defer closeDchFn()() var pa pubAckResponse if err := json.Unmarshal(m.Data, &pa); err != nil { diff --git a/jetstream/test/publish_test.go b/jetstream/test/publish_test.go index 2ab6de8f7..f79ad19fb 100644 --- a/jetstream/test/publish_test.go +++ b/jetstream/test/publish_test.go @@ -1453,6 +1453,7 @@ func TestPublishAsyncRetry(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + publishComplete := js.PublishAsyncComplete() errs := make(chan error, 1) go func() { // create stream with delay so that publish will receive no responders @@ -1476,6 +1477,12 @@ func TestPublishAsyncRetry(t *testing.T) { case <-time.After(5 * time.Second): t.Fatalf("Timeout waiting for ack") } + + select { + case <-publishComplete: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } }) } }