Skip to content

Commit

Permalink
refactor resource service emitting content ids on stream completion
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Poelen committed Dec 26, 2024
1 parent d1d8d0e commit 4419639
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.globalbioticinteractions.elton.cmd;

import bio.guoda.preston.HashType;
import bio.guoda.preston.Hasher;
import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.cmd.ActivityContext;
import bio.guoda.preston.process.ActivityUtil;
import bio.guoda.preston.process.StatementsEmitter;
import org.apache.commons.rdf.api.IRI;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

public class DigestEmittingInputStream extends DigestInputStream {
final AtomicBoolean isEOF;
final AtomicBoolean hasLogged;
final URI resourceLocation;
private final MessageDigest md;
private final URI resource;
private final ActivityContext ctx;
private final StatementsEmitter activityEmitter;
private final HashType hashType;

public DigestEmittingInputStream(InputStream retrieve,
MessageDigest md,
URI resource,
ActivityContext ctx,
StatementsEmitter activityEmitter,
HashType hashType) {
super(retrieve, md);
this.md = md;
this.resource = resource;
isEOF = new AtomicBoolean(false);
hasLogged = new AtomicBoolean(false);
resourceLocation = resource;
this.ctx = ctx;
this.activityEmitter = activityEmitter;
this.hashType = hashType;
}

public int read() throws IOException {
return setEOFIfEncountered(super.read());
}

public int read(byte[] var1, int var2, int var3) throws IOException {
return setEOFIfEncountered(super.read(var1, var2, var3));
}

private int setEOFIfEncountered(int read) {
if (read == -1) {
isEOF.set(true);
}
return read;
}

public void close() throws IOException {
this.in.close();

if (isEOF.get() && !hasLogged.get()) {
IRI object = Hasher.toHashIRI(this.md, this.hashType);
ActivityUtil.emitDownloadActivity(
RefNodeFactory.toIRI(this.resourceLocation),
object,
this.activityEmitter,
Optional.of(this.ctx.getActivity()));
hasLogged.set(true);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class LoggingResourceService implements ResourceService {
Expand Down Expand Up @@ -64,7 +62,14 @@ private InputStream logVersion(URI uri, InputStream retrieve) throws IOException
final URI resource = local instanceof Dataset
? getLocationInDataset(uri, (Dataset) local) : uri;

return new DigestLoggingInputStream(retrieve, md, resource);
return new DigestEmittingInputStream(
retrieve,
md,
resource,
ctx,
activityEmitter,
hashType
);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("algorithm [" + hashType.getAlgorithm() + "] not supported", e);
}
Expand Down Expand Up @@ -101,51 +106,4 @@ private URI getLocationInDataset(URI uri, Dataset dataset) throws IOException {
return resourceLocation;
}

public class DigestLoggingInputStream extends DigestInputStream {
final AtomicBoolean isEOF;
final AtomicBoolean hasLogged;
final URI resourceLocation;
private final MessageDigest md;
private final URI resource;

public DigestLoggingInputStream(InputStream retrieve, MessageDigest md, URI resource) {
super(retrieve, md);
this.md = md;
this.resource = resource;
isEOF = new AtomicBoolean(false);
hasLogged = new AtomicBoolean(false);
resourceLocation = resource;
}

public int read() throws IOException {
return setEOFIfEncountered(super.read());
}

public int read(byte[] var1, int var2, int var3) throws IOException {
return setEOFIfEncountered(super.read(var1, var2, var3));
}

private int setEOFIfEncountered(int read) {
if (read == -1) {
isEOF.set(true);
}
return read;
}

public void close() throws IOException {
this.in.close();

if (isEOF.get() && !hasLogged.get()) {
IRI object = Hasher.toHashIRI(md, hashType);
ActivityUtil.emitDownloadActivity(
RefNodeFactory.toIRI(resourceLocation),
object,
activityEmitter,
Optional.of(ctx.getActivity()));
hasLogged.set(true);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public ResourceServiceListening(
@Override
public InputStream retrieve(URI uri) throws IOException {


IRI accessId = activityIdFactory.get();
activityListener.onStarted(ctx.getActivity(), accessId, RefNodeFactory.toIRI(uri));
InputStream is = service.retrieve(uri);
Expand Down

0 comments on commit 4419639

Please sign in to comment.