Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish nack metrics by requeue flag #1469

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/com/rabbitmq/client/MetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ default void basicPublishUnrouted(Channel channel) {

void basicAck(Channel channel, long deliveryTag, boolean multiple);

void basicNack(Channel channel, long deliveryTag);
void basicNack(Channel channel, long deliveryTag, boolean requeue);

void basicReject(Channel channel, long deliveryTag);
void basicReject(Channel channel, long deliveryTag, boolean requeue);

void basicConsume(Channel channel, String consumerTag, boolean autoAck);

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
}

@Override
public void basicNack(Channel channel, long deliveryTag) {
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {

}

@Override
public void basicReject(Channel channel, long deliveryTag) {
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {

private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();

private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
private final Function<Boolean, Runnable> markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue);

private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();

Expand Down Expand Up @@ -236,18 +236,18 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
}

@Override
public void basicNack(Channel channel, long deliveryTag) {
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
try {
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction);
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
}
}

@Override
public void basicReject(Channel channel, long deliveryTag) {
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
try {
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction);
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
}
Expand Down Expand Up @@ -409,7 +409,7 @@ private ChannelState(Channel channel) {
/**
* Marks the event of a rejected message.
*/
protected abstract void markRejectedMessage();
protected abstract void markRejectedMessage(boolean requeue);

/**
* Marks the event of a message publishing acknowledgement.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException
{
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
metricsCollector.basicNack(this, deliveryTag);
metricsCollector.basicNack(this, deliveryTag, requeue);
}

/** Public API - {@inheritDoc} */
Expand All @@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue)
throws IOException
{
transmit(new Basic.Reject(deliveryTag, requeue));
metricsCollector.basicReject(this, deliveryTag);
metricsCollector.basicReject(this, deliveryTag, requeue);
}

/** Public API - {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {

private final Counter rejectedMessages;

private final Counter requeuedPublishedMessages;

public MicrometerMetricsCollector(MeterRegistry registry) {
this(registry, "rabbitmq");
}
Expand Down Expand Up @@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES);
}

@Override
Expand Down Expand Up @@ -133,7 +136,10 @@ protected void markAcknowledgedMessage() {
}

@Override
protected void markRejectedMessage() {
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedPublishedMessages.increment();
}
rejectedMessages.increment();
}

Expand Down Expand Up @@ -252,6 +258,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".unrouted_published", tags);
}
},
REQUEUED_PUBLISHED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".requeued_published", tags);
}
};

abstract Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
private final LongCounter ackedPublishedMessagesCounter;
private final LongCounter nackedPublishedMessagesCounter;
private final LongCounter unroutedPublishedMessagesCounter;
private final LongCounter requeuedPublishedMessagesCounter;

public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
this(openTelemetry, "rabbitmq");
Expand Down Expand Up @@ -123,6 +124,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
.setUnit("{messages}")
.setDescription("The number of un-routed published messages to the RabbitMQ server")
.build();

// requeuedPublishedMessages
this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published")
.setUnit("{messages}")
.setDescription("The number of re-queued published messages to the RabbitMQ server")
.build();
}

@Override
Expand Down Expand Up @@ -166,7 +173,10 @@ protected void markAcknowledgedMessage() {
}

@Override
protected void markRejectedMessage() {
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedPublishedMessagesCounter.add(1L, attributes);
}
rejectedMessagesCounter.add(1L, attributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
private final Meter publishAcknowledgedMessages;
private final Meter publishNacknowledgedMessages;
private final Meter publishUnroutedMessages;
private final Meter requeuedPublishedMessages;


public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
Expand All @@ -59,6 +60,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published");
}

public StandardMetricsCollector() {
Expand Down Expand Up @@ -110,7 +112,7 @@ protected void markAcknowledgedMessage() {
}

@Override
protected void markRejectedMessage() {
protected void markRejectedMessage(boolean requeue) {
rejectedMessages.mark();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw
return;
}
transmit(new Basic.Nack(realTag, multiple, requeue));
metricsCollector.basicNack(this, deliveryTag);
metricsCollector.basicNack(this, deliveryTag, requeue);
}

@Override
Expand All @@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException {
long realTag = deliveryTag - activeDeliveryTagOffset;
if (realTag > 0) {
transmit(new Basic.Reject(realTag, requeue));
metricsCollector.basicReject(this, deliveryTag);
metricsCollector.basicReject(this, deliveryTag, requeue);
}
}

Expand Down