Developed a high-performance ring buffer leveraging lock-free
and low-level unsafe
operations to optimize for speed, capable of processing and delivering over 10 million messages per second.
Benchmark | Results |
---|---|
Unicast -1P – 1C throughput: 11,200,000 messages/s. Avg-latency: 89.4 ns |
|
Three Step Pipeline -1P – 3C throughput: 10,000,000 messages/s. Avg-latency: 119 ns |
-
OneToManyRingBuffer
(also configurable forOneToOneRingBuffer
usage) -
ManyToManyRingBuffer
(combine agrona'sManyToOneRingBuffer
andOneToManyRingBuffer
, see How to create aManyToManyRingBuffer
)
Use agrona's ManyToOneRingBuffer
and cafe's OneToManyRingBuffer
to create a ManyToManyRingBuffer
.
ManyToManyRingBuffer = ManyToOneRingBuffer + OneToManyRingBuffer
import gc.garcol.libcore.OneToManyRingBuffer;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.ControlledMessageHandler;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import java.nio.ByteBuffer;
import java.util.UUID;
/**
* ManyToManyRingBuffer = pipeline(ManyToOneRingBuffer -> OneToManyRingBuffer)
*
* @author thaivc
* @since 2024
*/
@Slf4j
@Accessors(fluent = true)
@RequiredArgsConstructor
public class ManyToManyRingBuffer
{
private final ManyToOneRingBuffer inboundRingBuffer;
@Getter
private final OneToManyRingBuffer oneToManyRingBuffer;
private final ByteBuffer cachedBuffer = ByteBuffer.allocate(1 << 10);
public boolean publishMessage(int messageType, UUID sender, byte[] message)
{
int claimIndex = inboundRingBuffer.tryClaim(messageType, message.length + Long.BYTES * 2);
if (claimIndex <= 0)
{
return false;
}
inboundRingBuffer.buffer().putLong(claimIndex, sender.getMostSignificantBits());
inboundRingBuffer.buffer().putLong(claimIndex + Long.BYTES, sender.getLeastSignificantBits());
inboundRingBuffer.buffer().putBytes(claimIndex + Long.BYTES * 2, message);
inboundRingBuffer.commit(claimIndex);
return true;
}
public void transfer()
{
inboundRingBuffer.controlledRead((int msgTypeId, MutableDirectBuffer buffer, int index, int length) -> {
cachedBuffer.clear();
buffer.getBytes(index, cachedBuffer, length);
cachedBuffer.position(length);
cachedBuffer.flip();
boolean success = oneToManyRingBuffer.write(msgTypeId, cachedBuffer);
return success ? ControlledMessageHandler.Action.CONTINUE : ControlledMessageHandler.Action.ABORT;
});
}
}
Gradle kotlin
implementation("io.github.gc-garcol:cafe-ringbuffer:1.0.0")
Apache maven
<dependency>
<groupId>io.github.gc-garcol</groupId>
<artifactId>cafe-ringbuffer</artifactId>
<version>1.0.1</version>
</dependency>
add --add-opens java.base/java.nio=ALL-UNNAMED
as a JVM argument
A Simple Example (A Simple Example (to have the best performance, should reuse the ByteBuffer
in publish
and consume
))
- Initialize the RingBuffer with a capacity of 1024 and a total of 2 consumers.
OneToManyRingBuffer oneToManyRingBuffer = new OneToManyRingBuffer(10, 2);
- Publish a message to the
RingBuffer
.
ByteBuffer messageBufferWriter = ByteBuffer.allocate(1 << 10);
ByteBufferUtil.put(messageBufferWriter, 0, "hello world!!".getBytes());
messageBufferWriter.flip();
oneToManyRingBuffer.write(1, messageBufferWriter);
- Consume messages in the first consumer on a dedicated thread.
ByteBuffer messageBufferReader = ByteBuffer.allocate(1 << 10);
MessageHandler handler = (msgTypeId, buffer, index, length) -> {
System.out.println("msgTypeId: " + msgTypeId);
System.out.println("index: " + index);
System.out.println("length: " + length);
messageBufferReader.clear();
buffer.getBytes(index, messageBufferReader, 0, length);
messageBufferReader.position(length);
messageBufferReader.flip();
byte[] messageBytes = new byte[length];
messageBufferReader.get(messageBytes);
System.out.println("message: " + new String(messageBytes));
Assertions.assertEquals(message, new String(messageBytes), "Message not match");
return true;
};
oneToManyRingBuffer.read(0, handler);
- Consume a message from the second consumer on a separate thread.
ByteBuffer messageBufferReader = ByteBuffer.allocate(1 << 10);
MessageHandler handler = (msgTypeId, buffer, index, length) -> {
System.out.println("msgTypeId: " + msgTypeId);
System.out.println("index: " + index);
System.out.println("length: " + length);
messageBufferReader.clear();
buffer.getBytes(index, messageBufferReader, 0, length);
messageBufferReader.position(length);
messageBufferReader.flip();
byte[] messageBytes = new byte[length];
messageBufferReader.get(messageBytes);
System.out.println("message: " + new String(messageBytes));
Assertions.assertEquals(message, new String(messageBytes), "Message not match");
return true;
};
oneToManyRingBuffer.read(1, handler);
False sharing
:Happens before guarantee
: https://jenkov.com/tutorials/java-concurrency/java-happens-before-guarantee.htmlRing buffer
: https://aeron.io/docs/agrona/concurrent/#ring-buffers