diff --git a/src/main/java/org/globalbioticinteractions/elton/cmd/DigestEmittingInputStream.java b/src/main/java/org/globalbioticinteractions/elton/cmd/DigestEmittingInputStream.java new file mode 100644 index 0000000..825b503 --- /dev/null +++ b/src/main/java/org/globalbioticinteractions/elton/cmd/DigestEmittingInputStream.java @@ -0,0 +1,75 @@ +package org.globalbioticinteractions.elton.cmd; + +import bio.guoda.preston.HashType; +import bio.guoda.preston.Hasher; +import bio.guoda.preston.RefNodeFactory; +import bio.guoda.preston.cmd.ActivityContext; +import bio.guoda.preston.process.ActivityUtil; +import bio.guoda.preston.process.StatementsEmitter; +import org.apache.commons.rdf.api.IRI; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DigestEmittingInputStream extends DigestInputStream { + final AtomicBoolean isEOF; + final AtomicBoolean hasLogged; + final URI resourceLocation; + private final MessageDigest md; + private final URI resource; + private final ActivityContext ctx; + private final StatementsEmitter activityEmitter; + private final HashType hashType; + + public DigestEmittingInputStream(InputStream retrieve, + MessageDigest md, + URI resource, + ActivityContext ctx, + StatementsEmitter activityEmitter, + HashType hashType) { + super(retrieve, md); + this.md = md; + this.resource = resource; + isEOF = new AtomicBoolean(false); + hasLogged = new AtomicBoolean(false); + resourceLocation = resource; + this.ctx = ctx; + this.activityEmitter = activityEmitter; + this.hashType = hashType; + } + + public int read() throws IOException { + return setEOFIfEncountered(super.read()); + } + + public int read(byte[] var1, int var2, int var3) throws IOException { + return setEOFIfEncountered(super.read(var1, var2, var3)); + } + + private int setEOFIfEncountered(int read) { + if (read == -1) { + isEOF.set(true); + } + return read; + } + + public void close() throws IOException { + this.in.close(); + + if (isEOF.get() && !hasLogged.get()) { + IRI object = Hasher.toHashIRI(this.md, this.hashType); + ActivityUtil.emitDownloadActivity( + RefNodeFactory.toIRI(this.resourceLocation), + object, + this.activityEmitter, + Optional.of(this.ctx.getActivity())); + hasLogged.set(true); + } + } + +} diff --git a/src/main/java/org/globalbioticinteractions/elton/cmd/LoggingResourceService.java b/src/main/java/org/globalbioticinteractions/elton/cmd/LoggingResourceService.java index c20d742..b4ac95d 100644 --- a/src/main/java/org/globalbioticinteractions/elton/cmd/LoggingResourceService.java +++ b/src/main/java/org/globalbioticinteractions/elton/cmd/LoggingResourceService.java @@ -18,12 +18,10 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; public class LoggingResourceService implements ResourceService { @@ -64,7 +62,14 @@ private InputStream logVersion(URI uri, InputStream retrieve) throws IOException final URI resource = local instanceof Dataset ? getLocationInDataset(uri, (Dataset) local) : uri; - return new DigestLoggingInputStream(retrieve, md, resource); + return new DigestEmittingInputStream( + retrieve, + md, + resource, + ctx, + activityEmitter, + hashType + ); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("algorithm [" + hashType.getAlgorithm() + "] not supported", e); } @@ -101,51 +106,4 @@ private URI getLocationInDataset(URI uri, Dataset dataset) throws IOException { return resourceLocation; } - public class DigestLoggingInputStream extends DigestInputStream { - final AtomicBoolean isEOF; - final AtomicBoolean hasLogged; - final URI resourceLocation; - private final MessageDigest md; - private final URI resource; - - public DigestLoggingInputStream(InputStream retrieve, MessageDigest md, URI resource) { - super(retrieve, md); - this.md = md; - this.resource = resource; - isEOF = new AtomicBoolean(false); - hasLogged = new AtomicBoolean(false); - resourceLocation = resource; - } - - public int read() throws IOException { - return setEOFIfEncountered(super.read()); - } - - public int read(byte[] var1, int var2, int var3) throws IOException { - return setEOFIfEncountered(super.read(var1, var2, var3)); - } - - private int setEOFIfEncountered(int read) { - if (read == -1) { - isEOF.set(true); - } - return read; - } - - public void close() throws IOException { - this.in.close(); - - if (isEOF.get() && !hasLogged.get()) { - IRI object = Hasher.toHashIRI(md, hashType); - ActivityUtil.emitDownloadActivity( - RefNodeFactory.toIRI(resourceLocation), - object, - activityEmitter, - Optional.of(ctx.getActivity())); - hasLogged.set(true); - } - } - - } - } diff --git a/src/main/java/org/globalbioticinteractions/elton/util/ResourceServiceListening.java b/src/main/java/org/globalbioticinteractions/elton/util/ResourceServiceListening.java index 1f4d94d..978afef 100644 --- a/src/main/java/org/globalbioticinteractions/elton/util/ResourceServiceListening.java +++ b/src/main/java/org/globalbioticinteractions/elton/util/ResourceServiceListening.java @@ -38,7 +38,6 @@ public ResourceServiceListening( @Override public InputStream retrieve(URI uri) throws IOException { - IRI accessId = activityIdFactory.get(); activityListener.onStarted(ctx.getActivity(), accessId, RefNodeFactory.toIRI(uri)); InputStream is = service.retrieve(uri);