Skip to content

Commit

Permalink
Merge pull request #1 from sudhan499/master
Browse files Browse the repository at this point in the history
Add more stats, add cmd for running stat job via spark-submit to README
  • Loading branch information
prihoda authored Oct 14, 2019
2 parents 3b7f23a + 5faf691 commit 8cafe0a
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 6 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ convert \
--db.batchSize 1000 \
--output.saveMode Overwrite
```
To run stats job via spark-submit:

```bash
spark-submit --name "RDF2X ClinicalTrials.gov" --class com.merck.rdf2x.main.Main --master 'local' \
--driver-memory 2g \
--packages postgresql:postgresql:9.1-901-1.jdbc4,org.eclipse.rdf4j:rdf4j-runtime:2.1.4,org.apache.jena:jena-core:3.1.1,org.apache.jena:jena-elephas-io:3.1.1,org.apache.jena:jena-elephas-mapreduce:0.9.0,com.beust:jcommander:1.58,com.databricks:spark-csv_2.10:1.5.0,org.elasticsearch:elasticsearch-spark_2.10:2.4.4,org.jgrapht:jgrapht-core:1.0.1 \
rdf2x-0.1.jar \
stats \
--input.file bio2rdf-clinicaltrials.nq \
--input.batchSize 1000000 \
--stat SUBJECT_URI_COUNT
```

## Running on YARN

Expand Down Expand Up @@ -529,4 +541,4 @@ Additionally, the column is added to the EAV set, which means that all the colum

## Blank nodes

Not implemented yet. Triples with blank nodes are ignored.
Not implemented yet. Triples with blank nodes are ignored.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>com.merck.open</groupId>
<artifactId>rdf2x</artifactId>
<packaging>jar</packaging>
<version>0.1</version>
<version>0.1.1</version>
<name>rdf2x</name>
<url>http://maven.apache.org</url>
<properties>
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/merck/rdf2x/jobs/stats/StatsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public enum Stat {
/**
* Count number of occurrences of each subject URI
*/
SUBJECT_URI_COUNT
SUBJECT_URI_COUNT,

/**
* Count number of occurrences of each predicate URI
*/
PREDICATE_URI_COUNT,

/**
* Count number of occurrences of each object URI
*/
OBJECT_URI_COUNT
}
}
60 changes: 57 additions & 3 deletions src/main/java/com/merck/rdf2x/jobs/stats/StatsJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import javax.naming.ConfigurationException;
import java.util.List;

import static com.merck.rdf2x.jobs.stats.StatsConfig.Stat.SUBJECT_URI_COUNT;
import static com.merck.rdf2x.jobs.stats.StatsConfig.Stat.*;

/**
* StatsJob computes various stats on RDF datasets.
Expand All @@ -48,6 +48,21 @@ public class StatsJob implements Runnable {
*/
private final JavaSparkContext sc;

/**
* Subject URI count
*/
private Long subjectURICount;

/**
* Predicate URI count
*/
private Long predicateURICount;

/**
* Object URI count
*/
private Long objectURICount;

public StatsJob(StatsConfig config, JavaSparkContext sc) throws ConfigurationException {
this.config = config;
this.sc = sc;
Expand Down Expand Up @@ -80,12 +95,51 @@ public void run() {
counts.sortByKey(false).take(100).forEach(uriCount -> {
log.info(uriCount.toString());
});
subjectURICount = counts.count();
stats.remove(SUBJECT_URI_COUNT);
}

if(stats.contains(PREDICATE_URI_COUNT)) {
log.info("----------------------------");
log.info("Total Distinct Subject URIs: {}", counts.count());
log.info("Predicate URI stats:");
JavaPairRDD<Long, String> counts = QuadCounter.countByPredicateURI(quads).mapToPair(Tuple2::swap);
counts.sortByKey(false).take(100).forEach(uriCount -> {
log.info(uriCount.toString());
});
predicateURICount = counts.count();
stats.remove(PREDICATE_URI_COUNT);
}

if(stats.contains(OBJECT_URI_COUNT)) {
log.info("----------------------------");
stats.remove(SUBJECT_URI_COUNT);
log.info("Object URI stats:");
JavaPairRDD<Long, String> counts = QuadCounter.getObjectURI(quads).mapToPair(Tuple2::swap);
counts.sortByKey(false).take(100).forEach( uriCount -> {
log.info(uriCount.toString());
});
objectURICount = counts.count();
stats.remove(OBJECT_URI_COUNT);
}

printStats();

}

/**
* Prints stats for distinct counts of {@link Quad} properties i.e. subject, predicate and object
*/
private void printStats() {
log.info("---------------------------------");
if(subjectURICount != null) {
log.info("Total Distinct Subject URIs: {}", subjectURICount);
}
if(predicateURICount != null) {
log.info("Total Distinct Predicate URIs: {}", predicateURICount);
}
if(objectURICount != null) {
log.info("Total Distinct Object URIs: {}", objectURICount);
}
log.info("---------------------------------");
}

}
14 changes: 14 additions & 0 deletions src/main/java/com/merck/rdf2x/stats/QuadCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,18 @@ public static JavaPairRDD<String, Long> countBySubjectURI(JavaRDD<Quad> quads) {
.reduceByKey((a, b) -> a + b);
}

public static JavaPairRDD<String, Long> countByPredicateURI(JavaRDD<Quad> quads) {
return quads
.filter(quad -> quad.getPredicate().isURI())
.mapToPair(quad -> new Tuple2<>(quad.getPredicate().getURI(), 1L))
.reduceByKey((a, b) -> a + b);
}

public static JavaPairRDD<String, Long> getObjectURI(JavaRDD<Quad> quads) {
return quads
.filter(quad -> quad.getObject().isURI())
.mapToPair(quad -> new Tuple2<>(quad.getObject().getURI(), 1L))
.reduceByKey((a, b) -> a + b);
}

}

0 comments on commit 8cafe0a

Please sign in to comment.