Skip to content

Commit

Permalink
fix panic when logging pulsar error (#72)
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Malka <amirm@armosec.io>
  • Loading branch information
amirmalka authored Mar 11, 2024
1 parent 06ded54 commit a5459a1
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,27 +352,32 @@ func (p *PulsarMessageProducer) ProduceMessageWithoutIdentifier(ctx context.Cont

func logPulsarSyncAsyncErrors(msgID pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
var metricLabels prometheus.Labels
if msgID == nil {
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueError}
logger.L().Error("got empty messageID from pulsar", helpers.Error(err))
return
var msgIdStr string
if msgID != nil {
msgIdStr = msgID.String()
}

var messageProperties map[string]string
var messagePayloadBytes int
if message != nil {
messagePayloadBytes = len(message.Payload)
messageProperties = message.Properties
}

if err != nil {
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueError}
logger.L().Error("failed to send message to pulsar",
helpers.Error(err),
helpers.String("messageID", msgID.String()),
helpers.Int("payloadBytes", len(message.Payload)),
helpers.Interface("messageProperties", message.Properties))
helpers.String("messageID", msgIdStr),
helpers.Int("payloadBytes", messagePayloadBytes),
helpers.Interface("messageProperties", messageProperties))
} else {
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueSuccess}
logger.L().Debug("successfully sent message to pulsar", helpers.String("messageID", msgID.String()), helpers.Interface("messageProperties", message.Properties))
logger.L().Debug("successfully sent message to pulsar", helpers.String("messageID", msgIdStr), helpers.Interface("messageProperties", messageProperties))
}

pulsarProducerMessagesProducedCounter.With(metricLabels).Inc()
if message != nil {
pulsarProducerMessagePayloadBytesProducedCounter.With(metricLabels).Add(float64(len(message.Payload)))
}
pulsarProducerMessagePayloadBytesProducedCounter.With(metricLabels).Add(float64(messagePayloadBytes))
}

func NewProducerMessage(producerMessageKey, account, cluster, eventType string, payload []byte, optionalProperties ...map[string]string) *pulsar.ProducerMessage {
Expand Down

0 comments on commit a5459a1

Please sign in to comment.