From c7311b6608a978ea9bf56cd76dd67107e0c32cd4 Mon Sep 17 00:00:00 2001 From: Sahil Bondre Date: Fri, 29 Dec 2023 17:04:41 +0000 Subject: [PATCH] FFDB-006: Introduce RandomAccessLog with a FileChannel backed implementation + Segment model (#12) --- .../log/FileChannelRandomAccessLog.java | 55 ++++++ .../firefly/log/InvalidRangeException.java | 7 + .../firefly/log/RandomAccessLog.java | 15 ++ .../sahilbondre/firefly/model/Segment.java | 131 +++++++++++++ .../log/FileChannelRandomAccessLogTest.java | 124 +++++++++++++ .../firefly/model/SegmentTest.java | 173 ++++++++++++++++++ src/test/resources/.empty | 0 7 files changed, 505 insertions(+) create mode 100644 src/main/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLog.java create mode 100644 src/main/java/com/sahilbondre/firefly/log/InvalidRangeException.java create mode 100644 src/main/java/com/sahilbondre/firefly/log/RandomAccessLog.java create mode 100644 src/main/java/com/sahilbondre/firefly/model/Segment.java create mode 100644 src/test/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLogTest.java create mode 100644 src/test/java/com/sahilbondre/firefly/model/SegmentTest.java create mode 100644 src/test/resources/.empty diff --git a/src/main/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLog.java b/src/main/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLog.java new file mode 100644 index 0000000..3b21f2c --- /dev/null +++ b/src/main/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLog.java @@ -0,0 +1,55 @@ +package com.sahilbondre.firefly.log; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class FileChannelRandomAccessLog implements RandomAccessLog { + + private final String filePath; + private final RandomAccessFile randomAccessFile; + private final FileChannel fileChannel; + + public FileChannelRandomAccessLog(String filePath) throws IOException { + this.filePath = filePath; + this.randomAccessFile = new RandomAccessFile(filePath, "rw"); + this.fileChannel = randomAccessFile.getChannel(); + } + + @Override + public long size() throws IOException { + return fileChannel.size(); + } + + @Override + public String getFilePath() { + return filePath; + } + + @Override + public void append(byte[] message) throws IOException { + fileChannel.position(fileChannel.size()); + ByteBuffer buffer = ByteBuffer.wrap(message); + fileChannel.write(buffer); + } + + @Override + public byte[] read(long offset, long length) throws IOException, InvalidRangeException { + long fileSize = fileChannel.size(); + + if (offset < 0 || offset >= fileSize || length <= 0 || offset + length > fileSize) { + throw new InvalidRangeException("Invalid offset or length"); + } + + fileChannel.position(offset); + ByteBuffer buffer = ByteBuffer.allocate((int) length); + fileChannel.read(buffer); + return buffer.array(); + } + + public void close() throws IOException { + fileChannel.close(); + randomAccessFile.close(); + } +} diff --git a/src/main/java/com/sahilbondre/firefly/log/InvalidRangeException.java b/src/main/java/com/sahilbondre/firefly/log/InvalidRangeException.java new file mode 100644 index 0000000..5fc9bad --- /dev/null +++ b/src/main/java/com/sahilbondre/firefly/log/InvalidRangeException.java @@ -0,0 +1,7 @@ +package com.sahilbondre.firefly.log; + +public class InvalidRangeException extends IllegalArgumentException { + public InvalidRangeException(String message) { + super(message); + } +} diff --git a/src/main/java/com/sahilbondre/firefly/log/RandomAccessLog.java b/src/main/java/com/sahilbondre/firefly/log/RandomAccessLog.java new file mode 100644 index 0000000..d5733e7 --- /dev/null +++ b/src/main/java/com/sahilbondre/firefly/log/RandomAccessLog.java @@ -0,0 +1,15 @@ +package com.sahilbondre.firefly.log; + +import java.io.IOException; + +public interface RandomAccessLog { + long size() throws IOException; + + String getFilePath(); + + void append(byte[] message) throws IOException; + + byte[] read(long offset, long length) throws IOException, InvalidRangeException; + + void close() throws IOException; +} diff --git a/src/main/java/com/sahilbondre/firefly/model/Segment.java b/src/main/java/com/sahilbondre/firefly/model/Segment.java new file mode 100644 index 0000000..13b02c0 --- /dev/null +++ b/src/main/java/com/sahilbondre/firefly/model/Segment.java @@ -0,0 +1,131 @@ +package com.sahilbondre.firefly.model; + +public class Segment { + + private static final int CRC_LENGTH = 2; + private static final int KEY_SIZE_LENGTH = 2; + private static final int VALUE_SIZE_LENGTH = 4; + /** + * Class representing a segment of the log file. + *

+ * Two big decisions here to save on performance: + * 1. We're using byte[] instead of ByteBuffer. + * 2. We're trusting that the byte[] is immutable and hence avoiding copying it. + *

+ *

+ * 2 bytes: CRC + * 2 bytes: Key Size + * 4 bytes: Value Size + * n bytes: Key + * m bytes: Value + *

+ * Note: Value size is four bytes because we're using a 32-bit integer to store the size. + * Int is 32-bit signed, so we can only store 2^31 - 1 bytes in the value. + * Hence, the maximum size of the value is 2,147,483,647 bytes or 2.14 GB. + */ + private final byte[] bytes; + + private Segment(byte[] bytes) { + this.bytes = bytes; + } + + public static Segment fromByteArray(byte[] data) { + return new Segment(data); + } + + public static Segment fromKeyValuePair(byte[] key, byte[] value) { + int keySize = key.length; + int valueSize = value.length; + int totalSize = CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + keySize + valueSize; + + byte[] segment = new byte[totalSize]; + + // Set key size + segment[2] = (byte) ((keySize >> 8) & 0xFF); + segment[3] = (byte) (keySize & 0xFF); + + // Set value size + segment[4] = (byte) ((valueSize >> 24) & 0xFF); + segment[5] = (byte) ((valueSize >> 16) & 0xFF); + segment[6] = (byte) ((valueSize >> 8) & 0xFF); + segment[7] = (byte) (valueSize & 0xFF); + + System.arraycopy(key, 0, segment, CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH, keySize); + + System.arraycopy(value, 0, segment, CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + keySize, valueSize); + + byte[] crc = new Segment(segment).crc16(); + segment[0] = crc[0]; + segment[1] = crc[1]; + + return new Segment(segment); + } + + public byte[] getBytes() { + return bytes; + } + + public byte[] getKey() { + int keySize = getKeySize(); + return extractBytes(CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH, keySize); + } + + public byte[] getValue() { + int keySize = getKeySize(); + int valueSize = getValueSize(); + return extractBytes(CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + keySize, valueSize); + } + + public int getKeySize() { + return ((bytes[2] & 0xff) << 8) | (bytes[3] & 0xff); + } + + public int getValueSize() { + return ((bytes[4] & 0xff) << 24) | ((bytes[5] & 0xff) << 16) | + ((bytes[6] & 0xff) << 8) | (bytes[7] & 0xff); + } + + public byte[] getCrc() { + return extractBytes(0, CRC_LENGTH); + } + + public boolean isChecksumValid() { + byte[] crc = crc16(); + return crc[0] == bytes[0] && crc[1] == bytes[1]; + } + + public boolean isSegmentValid() { + return isChecksumValid() && getKeySize() > 0 && getValueSize() >= 0 + && bytes.length == CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + getKeySize() + getValueSize(); + } + + private byte[] extractBytes(int offset, int length) { + byte[] result = new byte[length]; + System.arraycopy(bytes, offset, result, 0, length); + return result; + } + + private byte[] crc16(byte[] segment) { + int crc = 0xFFFF; // Initial CRC value + int polynomial = 0x1021; // CRC-16 polynomial + + for (int index = CRC_LENGTH; index < segment.length; index++) { + byte b = segment[index]; + crc ^= (b & 0xFF) << 8; + + for (int i = 0; i < 8; i++) { + if ((crc & 0x8000) != 0) { + crc = (crc << 1) ^ polynomial; + } else { + crc <<= 1; + } + } + } + + return new byte[]{(byte) ((crc >> 8) & 0xFF), (byte) (crc & 0xFF)}; + } + + private byte[] crc16() { + return crc16(bytes); + } +} diff --git a/src/test/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLogTest.java b/src/test/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLogTest.java new file mode 100644 index 0000000..583e814 --- /dev/null +++ b/src/test/java/com/sahilbondre/firefly/log/FileChannelRandomAccessLogTest.java @@ -0,0 +1,124 @@ +package com.sahilbondre.firefly.log; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.junit.jupiter.api.Assertions.*; + +class FileChannelRandomAccessLogTest { + + private static final String TEST_FILE_NAME = "src/test/resources/test.log"; + private static final Path TEST_FILE_PATH = Paths.get(TEST_FILE_NAME); + private FileChannelRandomAccessLog randomAccessLog; + + @BeforeEach + void setUp() throws IOException { + Files.deleteIfExists(TEST_FILE_PATH); + Files.createFile(TEST_FILE_PATH); + randomAccessLog = new FileChannelRandomAccessLog(TEST_FILE_NAME); + } + + @AfterEach + void tearDown() throws IOException { + randomAccessLog.close(); + Files.deleteIfExists(TEST_FILE_PATH); + } + + @Test + void givenEmptyLog_whenGetSize_thenReturnsZero() throws IOException { + // Given + // An empty log + + // When + long size = randomAccessLog.size(); + + // Then + assertEquals(0, size); + } + + + @Test + void givenLogWithContent_whenGetSize_thenReturnsCorrectSize() throws IOException { + // Given + // A log with content + + // When + randomAccessLog.append("Hello".getBytes()); + randomAccessLog.append("World".getBytes()); + + // Then + assertEquals(10, randomAccessLog.size()); + } + + @Test + void givenLog_whenGetFilePath_thenReturnsCorrectPath() { + // Given + // A log instance + + // When + String filePath = randomAccessLog.getFilePath(); + + // Then + assertEquals(TEST_FILE_NAME, filePath); + } + + @Test + void givenLogWithContent_whenAppend_thenAppendsCorrectly() throws IOException { + // Given + // A log with existing content + + // When + randomAccessLog.append("Hello".getBytes()); + randomAccessLog.append("World".getBytes()); + byte[] result = randomAccessLog.read(0, randomAccessLog.size()); + + // Then + assertArrayEquals("HelloWorld".getBytes(), result); + } + + @Test + void givenLogWithContent_whenReadSubset_thenReturnsSubset() throws IOException, InvalidRangeException { + // Given + // A log with existing content + + // When + randomAccessLog.append("The quick brown fox".getBytes()); + byte[] result = randomAccessLog.read(4, 5); + + // Then + assertArrayEquals("quick".getBytes(), result); + } + + @Test + void givenInvalidRange_whenRead_thenThrowsInvalidRangeException() throws IOException { + // Given + randomAccessLog.append("Hello".getBytes()); + // An invalid range for reading + + // When/Then + assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(0, -1)); + assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(-1, 5)); + assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(15, 10)); + assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(2, 10)); + assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(0, 6)); + } + + @Test + void givenLog_whenClose_thenFileIsNotAccessible() throws IOException { + // Given + // An open log + + // When + randomAccessLog.close(); + + // Then + assertTrue(Files.exists(TEST_FILE_PATH)); + assertThrows(IOException.class, () -> randomAccessLog.append("NewContent".getBytes())); + } +} diff --git a/src/test/java/com/sahilbondre/firefly/model/SegmentTest.java b/src/test/java/com/sahilbondre/firefly/model/SegmentTest.java new file mode 100644 index 0000000..effe8a2 --- /dev/null +++ b/src/test/java/com/sahilbondre/firefly/model/SegmentTest.java @@ -0,0 +1,173 @@ +package com.sahilbondre.firefly.model; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class SegmentTest { + + @Test + void givenByteArray_whenCreatingSegment_thenAccessorsReturnCorrectValues() { + // Given + byte[] testData = new byte[]{ + (byte) -83, (byte) 64, + 0x00, 0x05, // Key Size + 0x00, 0x00, 0x00, 0x05, // Value Size + 0x48, 0x65, 0x6C, 0x6C, 0x6F, // Key: "Hello" + 0x57, 0x6F, 0x72, 0x6C, 0x64 // Value: "World" + }; + + // When + Segment segment = Segment.fromByteArray(testData); + + // Then + assertArrayEquals(testData, segment.getBytes()); + assertArrayEquals("Hello".getBytes(), segment.getKey()); + assertArrayEquals("World".getBytes(), segment.getValue()); + assertEquals(5, segment.getKeySize()); + assertEquals(5, segment.getValueSize()); + assertEquals(-83, segment.getCrc()[0]); + assertEquals(64, segment.getCrc()[1]); + assertTrue(segment.isSegmentValid()); + assertTrue(segment.isChecksumValid()); + } + + @Test + void givenCorruptedKeySizeSegment_whenCheckingChecksum_thenIsChecksumValidReturnsFalse() { + // Given + byte[] testData = new byte[]{ + (byte) -83, (byte) 64, + 0x01, 0x45, // Key Size (Bit Flipped) + 0x00, 0x00, 0x00, 0x05, // Value Size + 0x48, 0x65, 0x6C, 0x6C, 0x6F, // Key: "Hello" + 0x57, 0x6F, 0x72, 0x6C, 0x64 // Value: "World" + }; + + // When + Segment corruptedSegment = Segment.fromByteArray(testData); + + // Then + assertFalse(corruptedSegment.isChecksumValid()); + assertFalse(corruptedSegment.isSegmentValid()); + } + + @Test + void givenCorruptedValueSizeSegment_whenCheckingChecksum_thenIsChecksumValidReturnsFalse() { + // Given + byte[] testData = new byte[]{ + (byte) -83, (byte) 64, + 0x00, 0x05, // Key Size + 0x00, 0x00, 0x01, 0x05, // Value Size (Bit Flipped) + 0x48, 0x65, 0x6C, 0x6C, 0x6F, // Key: "Hello" + 0x57, 0x6F, 0x72, 0x6C, 0x64 // Value: "World" + }; + + // When + Segment corruptedSegment = Segment.fromByteArray(testData); + + // Then + assertFalse(corruptedSegment.isChecksumValid()); + assertFalse(corruptedSegment.isSegmentValid()); + } + + @Test + void givenCorruptedKeySegment_whenCheckingChecksum_thenIsChecksumValidReturnsFalse() { + // Given + byte[] testData = new byte[]{ + (byte) -83, (byte) 64, + 0x00, 0x05, // Key Size + 0x00, 0x00, 0x00, 0x05, // Value Size + 0x48, 0x65, 0x6C, 0x6C, 0x6E, // Key: "Hello" (Bit Flipped) + 0x57, 0x6F, 0x72, 0x6C, 0x64 // Value: "World" + }; + + // When + Segment corruptedSegment = Segment.fromByteArray(testData); + + // Then + assertFalse(corruptedSegment.isChecksumValid()); + assertFalse(corruptedSegment.isSegmentValid()); + } + + @Test + void givenCorruptedValueSegment_whenCheckingChecksum_thenIsChecksumValidReturnsFalse() { + // Given + byte[] testData = new byte[]{ + (byte) -83, (byte) 64, + 0x00, 0x05, // Key Size + 0x00, 0x00, 0x00, 0x05, // Value Size + 0x48, 0x65, 0x6C, 0x6C, 0x6F, // Key: "Hello" + 0x57, 0x6F, 0x62, 0x6C, 0x65 // Value: "World" (Bit Flipped) + }; + + // When + Segment corruptedSegment = Segment.fromByteArray(testData); + + // Then + assertFalse(corruptedSegment.isChecksumValid()); + assertFalse(corruptedSegment.isSegmentValid()); + } + + @Test + void givenIncorrectValueLengthSegment_whenCheckingSegmentValid_thenIsSegmentValidReturnsFalse() { + // Given + byte[] testData = new byte[]{ + (byte) -43, (byte) -70, + 0x00, 0x05, // Key Size + 0x00, 0x00, 0x00, 0x06, // Value Size (Incorrect) + 0x48, 0x65, 0x6C, 0x6C, 0x6F, // Key: "Hello" + 0x57, 0x6F, 0x72, 0x6C, 0x64 // Value: "World" + }; + + // When + Segment corruptedSegment = Segment.fromByteArray(testData); + + // Then + assertTrue(corruptedSegment.isChecksumValid()); + assertFalse(corruptedSegment.isSegmentValid()); + } + + @Test + void givenKeyValuePair_whenCreatingSegment_thenAccessorsReturnCorrectValues() { + // Given + byte[] key = "Hello".getBytes(); + byte[] value = "World".getBytes(); + byte[] expectedSegment = new byte[]{ + (byte) -83, (byte) 64, + 0x00, 0x05, // Key Size + 0x00, 0x00, 0x00, 0x05, // Value Size + 0x48, 0x65, 0x6C, 0x6C, 0x6F, // Key: "Hello" + 0x57, 0x6F, 0x72, 0x6C, 0x64 // Value: "World" + }; + + // When + Segment segment = Segment.fromKeyValuePair(key, value); + + // Then + assertArrayEquals("Hello".getBytes(), segment.getKey()); + assertArrayEquals("World".getBytes(), segment.getValue()); + assertEquals(5, segment.getKeySize()); + assertEquals(5, segment.getValueSize()); + assertEquals(-83, segment.getCrc()[0]); + assertEquals(64, segment.getCrc()[1]); + assertTrue(segment.isSegmentValid()); + assertTrue(segment.isChecksumValid()); + assertArrayEquals(expectedSegment, segment.getBytes()); + } + + @Test + void givenKeyAndValue_whenCreatingSegment_thenSegmentIsCreatedWithCorrectSizes() { + // Given + byte[] key = "Hello".getBytes(); + byte[] value = "World".getBytes(); + + // When + Segment segment = Segment.fromKeyValuePair(key, value); + + // Then + assertArrayEquals(key, segment.getKey()); + assertArrayEquals(value, segment.getValue()); + assertEquals(key.length, segment.getKeySize()); + assertEquals(value.length, segment.getValueSize()); + } +} diff --git a/src/test/resources/.empty b/src/test/resources/.empty new file mode 100644 index 0000000..e69de29