Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFDB-010: Implement compaction API #20

Merged
merged 3 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 123 additions & 43 deletions src/main/java/com/sahilbondre/firefly/FireflyDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FireflyDB {
private static final Map<String, FireflyDB> instances = new HashMap<>();
private static final String NOT_STARTED_ERROR_MESSAGE = "FireflyDB is not started.";
// 4 GB
private static final long MAX_LOG_SIZE = 4 * 1024 * 1024 * 1024L;

private final String folderPath;

private final String fileTablePath;
Expand Down Expand Up @@ -47,47 +53,9 @@ public boolean isStarted() {

public synchronized void start() throws IOException {
if (!isStarted) {
// Create file-table if it doesn't exist
Path path = Paths.get(fileTablePath);

if (Files.exists(path)) {
this.fileTable = SerializedPersistableFileTable.fromFile(fileTablePath);
} else {
this.fileTable = SerializedPersistableFileTable.fromEmpty();
}

// Find all files ending with .log
Files.walkFileTree(Paths.get(folderPath), new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {

String fileName = file.getFileName().toString();
if (fileName.endsWith(".log")) {
String fileNameWithoutExtension = fileName.substring(0, fileName.length() - 4);
if (isNumeric(fileNameWithoutExtension)) {
// Create a RandomAccessLog for each file
RandomAccessLog log = new FileChannelRandomAccessLog(file.toString());
// Add it to the logMap
logMap.put(Integer.parseInt(fileNameWithoutExtension), log);
}
}
return FileVisitResult.CONTINUE;
}
});

// File with the largest number is the active log
int max = logMap.keySet().stream().max(Integer::compareTo).orElse(0);

if (!logMap.containsKey(max)) {
logMap.put(max, new FileChannelRandomAccessLog(folderPath + "/" + max + ".log"));
}

activeLog = logMap.get(max);

// handle the case when there are no logs
logMap.put(max, activeLog);
isStarted = true;
compaction();
}
isStarted = true;
}

public synchronized void stop() throws IOException {
Expand All @@ -104,18 +72,32 @@ public synchronized void stop() throws IOException {

public synchronized void set(byte[] key, byte[] value) throws IOException {
if (!isStarted) {
throw new IllegalStateException("FireflyDB is not started.");
throw new IllegalStateException(NOT_STARTED_ERROR_MESSAGE);
}

// Append to active log
Segment segment = Segment.fromKeyValuePair(key, value);
FilePointer filePointer = activeLog.append(segment.getBytes());
fileTable.put(key, filePointer);

// Check if compaction is needed
if (activeLog.size() > MAX_LOG_SIZE) {
moveToNewActiveLog();
}
}

private void moveToNewActiveLog() throws IOException {
// Create a new log
int nextActiveLogId = activeLog == null ? 1 : activeLog.getLogId() + 1;
RandomAccessLog nextActiveLog = new FileChannelRandomAccessLog(folderPath + "/" + nextActiveLogId + ".log");
// Update logMap
logMap.put(nextActiveLogId, nextActiveLog);
activeLog = nextActiveLog;
}

public byte[] get(byte[] key) throws IOException {
if (!isStarted) {
throw new IllegalStateException("FireflyDB is not started.");
throw new IllegalStateException(NOT_STARTED_ERROR_MESSAGE);
}

// Get file-pointer from file-table
Expand All @@ -125,7 +107,105 @@ public byte[] get(byte[] key) throws IOException {
}

// Read from log
Segment segment = activeLog.readSegment(filePointer.getOffset());
String filename = Paths.get(filePointer.getFileName()).getFileName().toString();
Integer logId = Integer.parseInt(filename.substring(0, filename.length() - 4));
RandomAccessLog log = logMap.get(logId);
Segment segment = log.readSegment(filePointer.getOffset());
return segment.getValue();
}

public synchronized void compaction() throws IOException {
if (!isStarted) {
throw new IllegalStateException(NOT_STARTED_ERROR_MESSAGE);
}

closeAllLogMapsIfOpen();

// Iterate over all log files in descending order
List<RandomAccessLog> logs = getRandomAccessLogsFromDir(folderPath);

if (!logs.isEmpty()) {
// Set the last log as active log
activeLog = logs.get(0);
}

this.fileTable = SerializedPersistableFileTable.fromEmpty();

// Create a new log
moveToNewActiveLog();
// Iterate over all logs
for (RandomAccessLog log : logs) {
// Iterate over all segments in the log
long offset = 0;

while (offset < log.size()) {
Segment segment = log.readSegment(offset);
offset += segment.getBytes().length;
// Append only if the key is not seen before
if (fileTable.get(segment.getKey()) != null) {
continue;
}
// Append to new log
FilePointer filePointer = activeLog.append(segment.getBytes());
fileTable.put(segment.getKey(), filePointer);
}

orphanizeLog(log);
}

// update logmap
logMap.clear();
logMap.put(activeLog.getLogId(), activeLog);
// save file-table
fileTable.saveToDisk(fileTablePath);
}

private void closeAllLogMapsIfOpen() {
for (RandomAccessLog log : logMap.values()) {
try {
log.close();
} catch (IOException ignored) {
// Ignore
}
}
}

private void orphanizeLog(RandomAccessLog log) throws IOException {
log.close();
// rename all stale logs and add underscore before file name
Path oldPath = Paths.get(log.getFilePath());
Path dir = oldPath.getParent();
Path newPath = Paths.get(dir.toString(), "_" + oldPath.getFileName().toString());
Files.move(oldPath, newPath, StandardCopyOption.REPLACE_EXISTING);
}

private List<RandomAccessLog> getRandomAccessLogsFromDir(String dir) throws IOException {
List<RandomAccessLog> logs = new ArrayList<>();
Files.walkFileTree(Paths.get(dir), new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {

String fileName = file.getFileName().toString();
if (fileName.endsWith(".log")) {
String fileNameWithoutExtension = fileName.substring(0, fileName.length() - 4);
if (isNumeric(fileNameWithoutExtension)) {
// Create a RandomAccessLog for each file
RandomAccessLog log = new FileChannelRandomAccessLog(file.toString());
// Add it to the logMap
logs.add(log);
}
}
return FileVisitResult.CONTINUE;
}
});


// Sort the logs in descending order
logs.sort((o1, o2) -> {
int o1Id = Integer.parseInt(o1.getFilePath().substring(o1.getFilePath().lastIndexOf("/") + 1, o1.getFilePath().length() - 4));
int o2Id = Integer.parseInt(o2.getFilePath().substring(o2.getFilePath().lastIndexOf("/") + 1, o2.getFilePath().length() - 4));
return Integer.compare(o2Id, o1Id);
});
return logs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public static SerializedPersistableFileTable fromEmpty() {
}

public static SerializedPersistableFileTable fromFile(String filePath) throws FileNotFoundException, KryoException {
kryo.register(SerializedPersistableFileTable.class);
kryo.register(HashMap.class);
kryo.register(FilePointer.class);
try (Input input = new Input(new FileInputStream(filePath))) {
return kryo.readObject(input, SerializedPersistableFileTable.class);
} catch (KryoException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Paths;

public class FileChannelRandomAccessLog implements RandomAccessLog {

Expand Down Expand Up @@ -91,6 +92,12 @@ public Segment readSegment(long offset) throws IOException, InvalidRangeExceptio
return segment;
}

@Override
public Integer getLogId() {
String fileNameWithoutPath = Paths.get(filePath).getFileName().toString();
return Integer.parseInt(fileNameWithoutPath.substring(0, fileNameWithoutPath.length() - 4));
}

private int byteArrayToInt(byte[] bytes) {
return (bytes[0] << 8) | (bytes[1] & 0xFF);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ public interface RandomAccessLog {
Segment readSegment(long offset) throws IOException, InvalidRangeException;

void close() throws IOException;

Integer getLogId();
}
87 changes: 87 additions & 0 deletions src/test/java/com/sahilbondre/firefly/CompactionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.sahilbondre.firefly;

import com.sahilbondre.firefly.log.FileChannelRandomAccessLog;
import com.sahilbondre.firefly.log.RandomAccessLog;
import com.sahilbondre.firefly.model.Segment;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static com.sahilbondre.firefly.TestUtils.deleteFolderContentsIfExists;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class CompactionTest {

private static final String TEST_FOLDER = "src/test/resources/test_folder_compaction";
private static final String TEST_LOG_FILE_1 = "1.log";
private static final String TEST_LOG_FILE_2 = "2.log";
private static final String TEST_LOG_FILE_3 = "3.log";

private FireflyDB fireflyDB;

@BeforeEach
void setUp() throws IOException {
deleteFolderContentsIfExists(TEST_FOLDER);
// Create a test folder and log files
Files.createDirectories(Paths.get(TEST_FOLDER));
Files.createFile(Paths.get(TEST_FOLDER, TEST_LOG_FILE_1));
Files.createFile(Paths.get(TEST_FOLDER, TEST_LOG_FILE_2));
Files.createFile(Paths.get(TEST_FOLDER, TEST_LOG_FILE_3));

RandomAccessLog log1 = new FileChannelRandomAccessLog(TEST_FOLDER + "/" + TEST_LOG_FILE_1);
RandomAccessLog log2 = new FileChannelRandomAccessLog(TEST_FOLDER + "/" + TEST_LOG_FILE_2);
RandomAccessLog log3 = new FileChannelRandomAccessLog(TEST_FOLDER + "/" + TEST_LOG_FILE_3);

log1.append(Segment.fromKeyValuePair("key1".getBytes(), "value1".getBytes()).getBytes());
log1.append(Segment.fromKeyValuePair("key2".getBytes(), "value2".getBytes()).getBytes());
log1.append(Segment.fromKeyValuePair("key3".getBytes(), "value3".getBytes()).getBytes());

log2.append(Segment.fromKeyValuePair("key4".getBytes(), "value4".getBytes()).getBytes());
log2.append(Segment.fromKeyValuePair("key1".getBytes(), "value5".getBytes()).getBytes());
log2.append(Segment.fromKeyValuePair("key2".getBytes(), "value6".getBytes()).getBytes());

log3.append(Segment.fromKeyValuePair("key7".getBytes(), "value7".getBytes()).getBytes());
log3.append(Segment.fromKeyValuePair("key8".getBytes(), "value8".getBytes()).getBytes());
log3.append(Segment.fromKeyValuePair("key1".getBytes(), "value9".getBytes()).getBytes());

log1.close();
log2.close();
log3.close();

fireflyDB = FireflyDB.getInstance(TEST_FOLDER);
}

@AfterEach
void tearDown() throws IOException {
fireflyDB.stop();
deleteFolderContentsIfExists(TEST_FOLDER);
}

@Test
void givenMultipleLogFiles_whenCompaction_thenAllFilesRenamedCorrectly() throws IOException {
// Given
// A FireflyDB instance with a folder path
fireflyDB.start();

// When
// Compaction is triggered
fireflyDB.compaction();

// Then
// All log files are processed correctly
assertTrue(Files.exists(Paths.get(TEST_FOLDER, "_1.log")));
assertTrue(Files.exists(Paths.get(TEST_FOLDER, "_2.log")));
assertTrue(Files.exists(Paths.get(TEST_FOLDER, "_3.log")));
assertEquals("value9", new String(fireflyDB.get("key1".getBytes())));
assertEquals("value6", new String(fireflyDB.get("key2".getBytes())));
assertEquals("value3", new String(fireflyDB.get("key3".getBytes())));
assertEquals("value4", new String(fireflyDB.get("key4".getBytes())));
assertEquals("value7", new String(fireflyDB.get("key7".getBytes())));
assertEquals("value8", new String(fireflyDB.get("key8".getBytes())));
}
}
21 changes: 4 additions & 17 deletions src/test/java/com/sahilbondre/firefly/FireflyDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

import static com.sahilbondre.firefly.TestUtils.deleteFolderContentsIfExists;
import static org.junit.jupiter.api.Assertions.*;

class FireflyDBTest {

private static final String TEST_FOLDER = "src/test/resources/test_folder";
private static final String TEST_FOLDER = "src/test/resources/test_folder_simple";
private static final String TEST_LOG_FILE_1 = "1.log";
private static final String TEST_LOG_FILE_2 = "2.log";
private static final String TEST_LOG_FILE_3 = "3.log";
Expand All @@ -23,6 +22,7 @@ class FireflyDBTest {

@BeforeEach
void setUp() throws IOException {
deleteFolderContentsIfExists(TEST_FOLDER);
// Create a test folder and log files
Files.createDirectories(Paths.get(TEST_FOLDER));
Files.createFile(Paths.get(TEST_FOLDER, TEST_LOG_FILE_1));
Expand All @@ -35,20 +35,7 @@ void setUp() throws IOException {
@AfterEach
void tearDown() throws IOException {
fireflyDB.stop();
// Cleanup: Delete the test folder and its contents
try (Stream<Path> pathStream = Files.walk(Paths.get(TEST_FOLDER))) {
pathStream
.sorted((path1, path2) -> -path1.compareTo(path2))
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
deleteFolderContentsIfExists(TEST_FOLDER);
}

@Test
Expand Down
Loading
Loading