Skip to content

Commit

Permalink
Make the header if it is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
canack committed Oct 2, 2024
1 parent f0c0194 commit f69a1f6
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
p.Unlock()

internalHandler := func(msg *nats.Msg) {
if msg.Header == nil {
msg.Header = make(nats.Header)
}
if sub.hbMonitor != nil {
sub.hbMonitor.Stop()
}
Expand Down Expand Up @@ -781,6 +784,9 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
select {
case msg := <-msgs:
p.Lock()
if msg.Header == nil {
msg.Header = make(nats.Header)
}
if hbTimer != nil {
hbTimer.Reset(2 * req.Heartbeat)
}
Expand Down

0 comments on commit f69a1f6

Please sign in to comment.