Skip to content

Commit

Permalink
feat: cover the case of resource is very small and final syncThread i…
Browse files Browse the repository at this point in the history
…s not visible on completion thread
  • Loading branch information
Jacksgong committed Apr 27, 2018
1 parent fc4c5d1 commit 8795ac1
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,13 @@ public void cancel() {
noMoreStreamList.add(streamMap.keyAt(i));
}

if (syncFuture != null && !syncFuture.isDone() && runSyncThread != null) {
if (syncFuture != null && !syncFuture.isDone()) {
inspectValidPath();
OkDownload.with().processFileStrategy().getFileLock().increaseLock(path);

unparkThread(runSyncThread);

try {
syncFuture.get();
} catch (InterruptedException ignored) {
} catch (ExecutionException ignored) {
ensureSync(true, -1);
} finally {
OkDownload.with().processFileStrategy().getFileLock().decreaseLock(path);
}
}
} finally {
Expand All @@ -181,19 +178,34 @@ public void cancel() {
}
}

final StreamsState doneState = new StreamsState();

public void done(int blockIndex) throws IOException {
noMoreStreamList.add(blockIndex);

try {
if (syncException != null) throw syncException;

if (syncFuture != null && !syncFuture.isDone() && runSyncThread != null) {
if (syncFuture != null && !syncFuture.isDone()) {
final AtomicLong noSyncLength = noSyncLengthMap.get(blockIndex);
if (noSyncLength != null && noSyncLength.get() > 0) {
inspectStreamState(doneState);
final boolean isNoMoreStream = doneState.isNoMoreStream;

// ensure this block is synced.
parkedRunBlockThreadMap.put(blockIndex, Thread.currentThread());
unparkThread(runSyncThread);
parkThread();
ensureSync(isNoMoreStream, blockIndex);

}
} else {
if (syncFuture == null) {
Util.d(TAG, "OutputStream done but no need to ensure sync, because the "
+ "sync job not run yet. task[" + task.getId()
+ "] block[" + blockIndex + "]");
} else {
Util.d(TAG, "OutputStream done but no need to ensure sync, because the "
+ "syncFuture.isDone[" + syncFuture.isDone() + "] task[" + task.getId()
+ "] block[" + blockIndex + "]");

}
}

Expand All @@ -202,6 +214,45 @@ public void done(int blockIndex) throws IOException {
}
}

void ensureSync(boolean isNoMoreStream, int blockIndex) {
// sync job not run yet.
if (syncFuture == null || syncFuture.isDone()) return;

if (!isNoMoreStream) {
parkedRunBlockThreadMap.put(blockIndex, Thread.currentThread());
}

if (runSyncThread != null) {
unparkThread(runSyncThread);
} else {
// wait for runSyncThread is valid.
while (true) {
if (isRunSyncThreadValid()) {
unparkThread(runSyncThread);
break;
} else {
parkThread(25);
}
}
}

if (isNoMoreStream) {
unparkThread(runSyncThread);
try {
syncFuture.get();
} catch (InterruptedException ignored) {
} catch (ExecutionException ignored) {
}
} else {
parkThread();
}
}

// convenient for test
boolean isRunSyncThreadValid() {
return runSyncThread != null;
}

public void inspectComplete(int blockIndex) throws IOException {
final BlockInfo blockInfo = info.getBlock(blockIndex);
if (!Util.isCorrectFull(blockInfo.getCurrentOffset(), blockInfo.getContentLength())) {
Expand Down Expand Up @@ -343,8 +394,6 @@ void runSync() throws IOException {
flushProcess();
nextParkMills = syncBufferIntervalMills;
}

runSyncThread = null;
}

// convenient for test.
Expand All @@ -364,56 +413,51 @@ long now() {
}

void flushProcess() throws IOException {
try {
boolean success;
final int size;
synchronized (noSyncLengthMap) {
// make sure the length of noSyncLengthMap is equal to outputStreamMap
size = noSyncLengthMap.size();
}
boolean success;
final int size;
synchronized (noSyncLengthMap) {
// make sure the length of noSyncLengthMap is equal to outputStreamMap
size = noSyncLengthMap.size();
}

final SparseArray<Long> increaseLengthMap = new SparseArray<>(size);
final SparseArray<Long> increaseLengthMap = new SparseArray<>(size);

try {
for (int i = 0; i < size; i++) {
final int blockIndex = outputStreamMap.keyAt(i);
// because we get no sync length value before flush and sync,
// so the length only possible less than or equal to the real persist
// length.
final long noSyncLength = noSyncLengthMap.get(blockIndex).get();
if (noSyncLength > 0) {
increaseLengthMap.put(blockIndex, noSyncLength);
final DownloadOutputStream outputStream = outputStreamMap
.get(blockIndex);
outputStream.flushAndSync();
}
try {
for (int i = 0; i < size; i++) {
final int blockIndex = outputStreamMap.keyAt(i);
// because we get no sync length value before flush and sync,
// so the length only possible less than or equal to the real persist
// length.
final long noSyncLength = noSyncLengthMap.get(blockIndex).get();
if (noSyncLength > 0) {
increaseLengthMap.put(blockIndex, noSyncLength);
final DownloadOutputStream outputStream = outputStreamMap
.get(blockIndex);
outputStream.flushAndSync();
}
success = true;
} catch (IOException ex) {
Util.w(TAG, "OutputStream flush and sync data to filesystem failed " + ex);
success = false;
}
success = true;
} catch (IOException ex) {
Util.w(TAG, "OutputStream flush and sync data to filesystem failed " + ex);
success = false;
}

if (success) {
final int increaseLengthSize = increaseLengthMap.size();
long allIncreaseLength = 0;
for (int i = 0; i < increaseLengthSize; i++) {
final int blockIndex = increaseLengthMap.keyAt(i);
final long noSyncLength = increaseLengthMap.valueAt(i);
store.onSyncToFilesystemSuccess(info, blockIndex, noSyncLength);
allIncreaseLength += noSyncLength;
noSyncLengthMap.get(blockIndex).addAndGet(-noSyncLength);
Util.d(TAG, "OutputStream sync success (" + task.getId() + ") "
+ "block(" + blockIndex + ") " + " syncLength(" + noSyncLength + ")"
+ " currentOffset(" + info.getBlock(blockIndex).getCurrentOffset()
+ ")");
}
allNoSyncLength.addAndGet(-allIncreaseLength);
lastSyncTimestamp.set(SystemClock.uptimeMillis());
if (success) {
final int increaseLengthSize = increaseLengthMap.size();
long allIncreaseLength = 0;
for (int i = 0; i < increaseLengthSize; i++) {
final int blockIndex = increaseLengthMap.keyAt(i);
final long noSyncLength = increaseLengthMap.valueAt(i);
store.onSyncToFilesystemSuccess(info, blockIndex, noSyncLength);
allIncreaseLength += noSyncLength;
noSyncLengthMap.get(blockIndex).addAndGet(-noSyncLength);
Util.d(TAG, "OutputStream sync success (" + task.getId() + ") "
+ "block(" + blockIndex + ") " + " syncLength(" + noSyncLength + ")"
+ " currentOffset(" + info.getBlock(blockIndex).getCurrentOffset()
+ ")");
}
} finally {
inspectValidPath();
OkDownload.with().processFileStrategy().getFileLock().decreaseLock(path);
allNoSyncLength.addAndGet(-allIncreaseLength);
lastSyncTimestamp.set(SystemClock.uptimeMillis());
}
}

Expand Down
Loading

0 comments on commit 8795ac1

Please sign in to comment.