diff --git a/README.md b/README.md index 3ca852b..a41a51f 100755 --- a/README.md +++ b/README.md @@ -195,6 +195,18 @@ convert \ --db.batchSize 1000 \ --output.saveMode Overwrite ``` +To run stats job via spark-submit: + +```bash +spark-submit --name "RDF2X ClinicalTrials.gov" --class com.merck.rdf2x.main.Main --master 'local' \ +--driver-memory 2g \ +--packages postgresql:postgresql:9.1-901-1.jdbc4,org.eclipse.rdf4j:rdf4j-runtime:2.1.4,org.apache.jena:jena-core:3.1.1,org.apache.jena:jena-elephas-io:3.1.1,org.apache.jena:jena-elephas-mapreduce:0.9.0,com.beust:jcommander:1.58,com.databricks:spark-csv_2.10:1.5.0,org.elasticsearch:elasticsearch-spark_2.10:2.4.4,org.jgrapht:jgrapht-core:1.0.1 \ +rdf2x-0.1.jar \ +stats \ +--input.file bio2rdf-clinicaltrials.nq \ +--input.batchSize 1000000 \ +--stat SUBJECT_URI_COUNT +``` ## Running on YARN @@ -529,4 +541,4 @@ Additionally, the column is added to the EAV set, which means that all the colum ## Blank nodes -Not implemented yet. Triples with blank nodes are ignored. \ No newline at end of file +Not implemented yet. Triples with blank nodes are ignored. diff --git a/pom.xml b/pom.xml index 84205ed..9974f66 100755 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.merck.open rdf2x jar - 0.1 + 0.1.1 rdf2x http://maven.apache.org diff --git a/src/main/java/com/merck/rdf2x/jobs/stats/StatsConfig.java b/src/main/java/com/merck/rdf2x/jobs/stats/StatsConfig.java index ae50120..bb8f67d 100755 --- a/src/main/java/com/merck/rdf2x/jobs/stats/StatsConfig.java +++ b/src/main/java/com/merck/rdf2x/jobs/stats/StatsConfig.java @@ -60,6 +60,16 @@ public enum Stat { /** * Count number of occurrences of each subject URI */ - SUBJECT_URI_COUNT + SUBJECT_URI_COUNT, + + /** + * Count number of occurrences of each predicate URI + */ + PREDICATE_URI_COUNT, + + /** + * Count number of occurrences of each object URI + */ + OBJECT_URI_COUNT } } diff --git a/src/main/java/com/merck/rdf2x/jobs/stats/StatsJob.java b/src/main/java/com/merck/rdf2x/jobs/stats/StatsJob.java index de3626d..7c1a859 100755 --- a/src/main/java/com/merck/rdf2x/jobs/stats/StatsJob.java +++ b/src/main/java/com/merck/rdf2x/jobs/stats/StatsJob.java @@ -30,7 +30,7 @@ import javax.naming.ConfigurationException; import java.util.List; -import static com.merck.rdf2x.jobs.stats.StatsConfig.Stat.SUBJECT_URI_COUNT; +import static com.merck.rdf2x.jobs.stats.StatsConfig.Stat.*; /** * StatsJob computes various stats on RDF datasets. @@ -48,6 +48,21 @@ public class StatsJob implements Runnable { */ private final JavaSparkContext sc; + /** + * Subject URI count + */ + private Long subjectURICount; + + /** + * Predicate URI count + */ + private Long predicateURICount; + + /** + * Object URI count + */ + private Long objectURICount; + public StatsJob(StatsConfig config, JavaSparkContext sc) throws ConfigurationException { this.config = config; this.sc = sc; @@ -80,12 +95,51 @@ public void run() { counts.sortByKey(false).take(100).forEach(uriCount -> { log.info(uriCount.toString()); }); + subjectURICount = counts.count(); + stats.remove(SUBJECT_URI_COUNT); + } + + if(stats.contains(PREDICATE_URI_COUNT)) { log.info("----------------------------"); - log.info("Total Distinct Subject URIs: {}", counts.count()); + log.info("Predicate URI stats:"); + JavaPairRDD counts = QuadCounter.countByPredicateURI(quads).mapToPair(Tuple2::swap); + counts.sortByKey(false).take(100).forEach(uriCount -> { + log.info(uriCount.toString()); + }); + predicateURICount = counts.count(); + stats.remove(PREDICATE_URI_COUNT); + } + + if(stats.contains(OBJECT_URI_COUNT)) { log.info("----------------------------"); - stats.remove(SUBJECT_URI_COUNT); + log.info("Object URI stats:"); + JavaPairRDD counts = QuadCounter.getObjectURI(quads).mapToPair(Tuple2::swap); + counts.sortByKey(false).take(100).forEach( uriCount -> { + log.info(uriCount.toString()); + }); + objectURICount = counts.count(); + stats.remove(OBJECT_URI_COUNT); } + printStats(); + + } + + /** + * Prints stats for distinct counts of {@link Quad} properties i.e. subject, predicate and object + */ + private void printStats() { + log.info("---------------------------------"); + if(subjectURICount != null) { + log.info("Total Distinct Subject URIs: {}", subjectURICount); + } + if(predicateURICount != null) { + log.info("Total Distinct Predicate URIs: {}", predicateURICount); + } + if(objectURICount != null) { + log.info("Total Distinct Object URIs: {}", objectURICount); + } + log.info("---------------------------------"); } } diff --git a/src/main/java/com/merck/rdf2x/stats/QuadCounter.java b/src/main/java/com/merck/rdf2x/stats/QuadCounter.java index 566f466..195ff9f 100755 --- a/src/main/java/com/merck/rdf2x/stats/QuadCounter.java +++ b/src/main/java/com/merck/rdf2x/stats/QuadCounter.java @@ -34,4 +34,18 @@ public static JavaPairRDD countBySubjectURI(JavaRDD quads) { .reduceByKey((a, b) -> a + b); } + public static JavaPairRDD countByPredicateURI(JavaRDD quads) { + return quads + .filter(quad -> quad.getPredicate().isURI()) + .mapToPair(quad -> new Tuple2<>(quad.getPredicate().getURI(), 1L)) + .reduceByKey((a, b) -> a + b); + } + + public static JavaPairRDD getObjectURI(JavaRDD quads) { + return quads + .filter(quad -> quad.getObject().isURI()) + .mapToPair(quad -> new Tuple2<>(quad.getObject().getURI(), 1L)) + .reduceByKey((a, b) -> a + b); + } + }