Skip to content

Commit

Permalink
Add corruption data handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Labazin authored and Artem Labazin committed Apr 29, 2019
1 parent 70e046e commit e17871e
Show file tree
Hide file tree
Showing 15 changed files with 465 additions and 82 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Add batch write/read (using FileChannel) with customizable batch size;
- Add concurrent access to queue (methods with `synchronized` keyword or based on locks).

## [2.1.0](https://github.com/infobip/popout/releases/tag/2.1.0) - 2019-04-28

### Added

- `AutoCloseable` interface for backend services;
- `FileQueueBuilder`.`corruptionHandler` - a handler for corrupted data from disk;
- `CorruptedDataException` and `ReadingFromDiskException` for different IO reading exceptions.

### Changed

- `BatchedFileQueue` now **clear** its tail on flush, instead of creating new.

## [2.0.4](https://github.com/infobip/popout/releases/tag/2.0.4) - 2019-04-25

### Changed
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Include the dependency to your project's pom.xml file:
<dependency>
<groupId>org.infobip.lib</groupId>
<artifactId>popout</artifactId>
<version>2.0.4</version>
<version>2.1.0</version>
</dependency>
...
</dependencies>
Expand All @@ -55,7 +55,7 @@ Include the dependency to your project's pom.xml file:
or Gradle:

```groovy
compile 'org.infobip.lib:popout:2.0.4'
compile 'org.infobip.lib:popout:2.1.0'
```

### Create a queue
Expand Down Expand Up @@ -102,6 +102,8 @@ Queue<Integer> queue = FileQueue.<Integer>batched()
.handler(myQueueLimitExceededHandler))
// restores from disk or not, during startup. If 'false' - the previous files will be removed
.restoreFromDisk(false)
// handler for corrupted data from disk
.corruptionHandler(new MyCorruptionHandler())
// WAL files configuration
.wal(WalFilesConfig.builder()
// the place where WAL files stores. Default is a queue's folder above
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ limitations under the License.
<parent>
<groupId>org.infobip.lib</groupId>
<artifactId>parent</artifactId>
<version>2.0.4</version>
<version>2.1.0</version>
</parent>

<artifactId>benchmarks</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ limitations under the License.

<groupId>org.infobip.lib</groupId>
<artifactId>parent</artifactId>
<version>2.0.4</version>
<version>2.1.0</version>
<packaging>pom</packaging>

<modules>
Expand Down Expand Up @@ -71,7 +71,7 @@ limitations under the License.
<url>https://github.com/infobip/popout</url>
<connection>scm:git:https://github.com/infobip/popout.git</connection>
<developerConnection>scm:git:https://github.com/infobip/popout.git</developerConnection>
<tag>2.0.4</tag>
<tag>2.1.0</tag>
</scm>

<distributionManagement>
Expand Down
2 changes: 1 addition & 1 deletion popout/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ limitations under the License.
<parent>
<groupId>org.infobip.lib</groupId>
<artifactId>parent</artifactId>
<version>2.0.4</version>
<version>2.1.0</version>
</parent>

<artifactId>popout</artifactId>
Expand Down
39 changes: 39 additions & 0 deletions popout/src/main/java/org/infobip/lib/popout/FileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractQueue;
import java.util.function.Function;

import org.infobip.lib.popout.Deserializer.DefaultDeserializer;
import org.infobip.lib.popout.Serializer.DefaultSerializer;
import org.infobip.lib.popout.batched.BatchedFileQueueBuilder;
import org.infobip.lib.popout.exception.CorruptedDataException;
import org.infobip.lib.popout.synced.SyncedFileQueueBuilder;

import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

/**
Expand Down Expand Up @@ -129,6 +132,8 @@ public abstract static class Builder<SELF extends FileQueue.Builder<SELF, T>, T>

boolean restoreFromDisk = true;

Function<CorruptedDataException, Boolean> corruptionHandler = new DefaultCorruptionHandler();

/**
* Sets the queue's name. It uses in files names patters.
*
Expand Down Expand Up @@ -259,6 +264,30 @@ public SELF restoreFromDisk (boolean value) {
return (SELF) this;
}

/**
* Sets the bad segments reading handler.
* <p>
* In case of reading a bad segment in a compressed file or a whole WAL-file,
* the user can handle the occured exception and return:
* <ul>
* <li>
* {@code true} - skip the record and continue reading the file;
* </li>
* <li>
* {@code false} - stop reading the file, start the next one.
* </li>
* </ul>
* The default behaviour is - log the exception and continue reading.
*
* @param value the new value
*
* @return this queue builder, for chain calls
*/
public SELF corruptionHandler (@NonNull Function<CorruptedDataException, Boolean> value) {
corruptionHandler = value;
return (SELF) this;
}

/**
* Builds a new queue with parameters from the builder.
*
Expand Down Expand Up @@ -346,4 +375,14 @@ protected void validateAndSetDefaults () {
Files.createDirectories(compressedFilesConfig.getFolder());
}
}

@Slf4j
public static class DefaultCorruptionHandler implements Function<CorruptedDataException, Boolean> {

@Override
public Boolean apply (CorruptedDataException exception) {
log.error("Corrupted data error", exception);
return Boolean.TRUE;
}
}
}
151 changes: 108 additions & 43 deletions popout/src/main/java/org/infobip/lib/popout/backend/CompressedFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Optional.ofNullable;
import static lombok.AccessLevel.PRIVATE;

import java.io.IOException;
Expand All @@ -28,47 +29,72 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Function;

import org.infobip.lib.popout.CompressedFilesConfig;
import org.infobip.lib.popout.FileQueue;
import org.infobip.lib.popout.exception.CorruptedDataException;

import io.appulse.utils.Bytes;
import io.appulse.utils.ReadBytesUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.FieldDefaults;
import lombok.val;

@FieldDefaults(level = PRIVATE, makeFinal = true)
class CompressedFiles implements Iterable<WalContent> {
class CompressedFiles implements Iterable<WalContent>, AutoCloseable {

private static final byte[] CLEAR = new byte[8192];

FilesManager files;

long maxFileSizeBytes;

Function<CorruptedDataException, Boolean> corruptionHandler;

@Builder
CompressedFiles (@NonNull String queueName,
@NonNull Boolean restoreFromDisk,
@NonNull CompressedFilesConfig config
@NonNull CompressedFilesConfig config,
Boolean restoreFromDisk,
Function<CorruptedDataException, Boolean> corruptionHandler
) {
val restoreFromDiskValue = ofNullable(restoreFromDisk)
.orElse(Boolean.TRUE);

val corruptionHandlerValue = ofNullable(corruptionHandler)
.orElseGet(() -> new FileQueue.DefaultCorruptionHandler());

files = FilesManager.builder()
.folder(config.getFolder())
.prefix(queueName + '-')
.suffix(".compressed")
.build();

if (!restoreFromDisk) {
if (!restoreFromDiskValue) {
files.clear();
}

maxFileSizeBytes = config.getMaxSizeBytes();
this.corruptionHandler = corruptionHandlerValue;
}

@Override
public Iterator<WalContent> iterator () {
return new CompressedFileIteratorManyFiles(files.getFilesFromQueue());
}

@Override
public void close () {
files.close();
}

@SneakyThrows
int peekContentPart (@NonNull Bytes buffer) {
return readContent((channel, header) -> {
int peekContentPart (@NonNull Bytes bytes) {
return readTo(bytes, (channel, header, buffer) -> {
val length = header.getLength();
if (buffer.isWritable(length)) {
val newCapacity = buffer.writerIndex() + length;
Expand All @@ -79,8 +105,8 @@ int peekContentPart (@NonNull Bytes buffer) {
}

@SneakyThrows
int pollContentPart (@NonNull Bytes buffer) {
return readContent((channel, header) -> {
int pollContentPart (@NonNull Bytes bytes) {
return readTo(bytes, (channel, header, buffer) -> {
val length = header.getLength();
if (!buffer.isWritable(length)) {
val newCapacity = buffer.writerIndex() + length;
Expand All @@ -98,37 +124,6 @@ int pollContentPart (@NonNull Bytes buffer) {
});
}

@SneakyThrows
private int readContent (RecordReader reader) {
val header = new RecordHeader();
do {
val file = files.peek();
if (file == null) {
return 0;
}

boolean shouldRemoveFile = false;
try (val channel = FileChannel.open(file, READ, WRITE)) {
header.skipJumps(channel);
if (header.isEnd()) {
shouldRemoveFile = true;
} else {
val readed = reader.read(channel, header);
if (readed == 0 || header.isEnd()) {
shouldRemoveFile = true;
}
if (readed > 0) {
return readed;
}
}
} finally {
if (shouldRemoveFile) {
files.remove(file);
}
}
} while (true);
}

@SneakyThrows
CompressionResult compress (@NonNull Collection<Path> walFiles) {
val result = new CompressionResult(new ArrayList<>(), new ArrayList<>(walFiles));
Expand Down Expand Up @@ -178,9 +173,58 @@ long diskSize () {
return result;
}

@Override
public Iterator<WalContent> iterator () {
return new CompressedFileIteratorManyFiles(files.getFilesFromQueue());
private int readTo (Bytes buffer, RecordReader reader) {
val writerIndex = buffer.writerIndex();
val readerIndex = buffer.readerIndex();

RecordHeader header = new RecordHeader();
do {
Path file = files.peek();
if (file == null) {
return 0;
}

val result = readTo(file, header, buffer, reader);
if (result.isRemoveFile()) {
files.remove(file);
}
if (result.hesReaded()) {
return (int) result.getReaded();
}

buffer.writerIndex(writerIndex);
buffer.readerIndex(readerIndex);
} while (true);
}

@SneakyThrows
private ReadResult readTo (Path file, RecordHeader header, Bytes buffer, RecordReader reader) {
FileChannel channel = null;
try {
channel = FileChannel.open(file, READ, WRITE);

header.skipJumps(channel);
if (header.isEnd()) {
return ReadResult.endOfFile();
}

val readed = reader.read(channel, header, buffer);
boolean shouldRemoveFile = readed == 0 || header.isEnd();
return new ReadResult(readed, shouldRemoveFile);
} catch (Exception ex) {
val offset = channel == null
? 0
: channel.position();

val exception = new CorruptedDataException(file, offset, ex);
return corruptionHandler.apply(exception)
? ReadResult.endOfFile()
: ReadResult.continueReading(offset);
} finally {
if (channel != null) {
channel.close();
}
}
}

@SneakyThrows
Expand All @@ -205,6 +249,27 @@ private long allocate (Path path, long size) {
@FunctionalInterface
interface RecordReader {

int read (FileChannel channel, RecordHeader header) throws IOException;
int read (FileChannel channel, RecordHeader header, Bytes buffer) throws IOException;
}

@Value
@AllArgsConstructor
static class ReadResult {

static ReadResult endOfFile () {
return new ReadResult(0, true);
}

static ReadResult continueReading (long readed) {
return new ReadResult(readed, false);
}

long readed;

boolean removeFile;

boolean hesReaded () {
return readed > 0;
}
}
}
Loading

0 comments on commit e17871e

Please sign in to comment.