diff --git a/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs b/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs index b3d662087e..f6d014c8eb 100644 --- a/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs +++ b/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs @@ -43,8 +43,12 @@ impl BatchState { let tmp = self.value.take().unwrap(); let payload = message.into_payload(); - self.message_count += (self.compute_batch_size)(&payload); - self.value = Some((self.accumulator)(tmp, payload)); + let batch_size = (self.compute_batch_size)(&payload); + + if batch_size > 0 { + self.message_count += batch_size; + self.value = Some((self.accumulator)(tmp, payload)); + } } }