Skip to content

Commit

Permalink
Feature/most recent unique (#2570)
Browse files Browse the repository at this point in the history
* Refactored the mostRecentUnique functionality to avoid the ivarator paths
* Implementation a file backed sorted map
* Created additional document map for the return cache
  • Loading branch information
ivakegg authored Jan 15, 2025
1 parent 4bb79c7 commit 7f309fd
Show file tree
Hide file tree
Showing 78 changed files with 7,436 additions and 528 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected void setup(org.apache.hadoop.mapreduce.Mapper<Key,Value,Key,Value>.Con

QueryLogic<?> logic = (QueryLogic<?>) super.applicationContext.getBean(logicName);
t = logic.getEnrichedTransformer(query);

Assert.notNull(logic.getMarkingFunctions());
Assert.notNull(logic.getResponseObjectFactory());
this.format = SerializationFormat.valueOf(context.getConfiguration().get(RESULT_SERIALIZATION_FORMAT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,23 @@
<bean class="datawave.query.language.functions.jexl.TimeFunction"/>
<bean class="datawave.query.language.functions.jexl.Jexl"/>
<bean class="datawave.query.language.functions.jexl.Options"/>
<bean class="datawave.query.language.functions.jexl.GroupBy"/>
<bean class="datawave.query.language.functions.jexl.Unique"/>
<bean class="datawave.query.language.functions.jexl.UniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.UniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMinute"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMonth"/>
<bean class="datawave.query.language.functions.jexl.UniqueBySecond"/>
<bean class="datawave.query.language.functions.jexl.UniqueByTenthOfHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUnique"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByMinute"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByMonth"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueBySecond"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByTenthOfHour"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.Geowave.Contains"/>
<bean class="datawave.query.language.functions.jexl.Geowave.CoveredBy"/>
<bean class="datawave.query.language.functions.jexl.Geowave.Covers"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,23 @@
<bean class="datawave.query.language.functions.jexl.AtomValuesMatchFunction"/>
<bean class="datawave.query.language.functions.jexl.Options"/>
<bean class="datawave.query.language.functions.jexl.Rename"/>
<bean class="datawave.query.language.functions.jexl.Unique"/>
<bean class="datawave.query.language.functions.jexl.GroupBy"/>
<bean class="datawave.query.language.functions.jexl.Unique"/>
<bean class="datawave.query.language.functions.jexl.UniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.UniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMinute"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMonth"/>
<bean class="datawave.query.language.functions.jexl.UniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.UniqueBySecond"/>
<bean class="datawave.query.language.functions.jexl.UniqueByTenthOfHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUnique"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByMinute"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByMonth"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueBySecond"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByTenthOfHour"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.NoExpansion"/>
<bean class="datawave.query.language.functions.jexl.Compare"/>
<bean class="datawave.query.language.functions.jexl.Geowave.Contains"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,23 @@
<bean class="datawave.query.language.functions.jexl.TimeFunction"/>
<bean class="datawave.query.language.functions.jexl.Jexl"/>
<bean class="datawave.query.language.functions.jexl.Options"/>
<bean class="datawave.query.language.functions.jexl.GroupBy"/>
<bean class="datawave.query.language.functions.jexl.Unique"/>
<bean class="datawave.query.language.functions.jexl.UniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.UniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMinute"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMonth"/>
<bean class="datawave.query.language.functions.jexl.UniqueBySecond"/>
<bean class="datawave.query.language.functions.jexl.UniqueByTenthOfHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUnique"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByMinute"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByMonth"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueBySecond"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByTenthOfHour"/>
<bean class="datawave.query.language.functions.jexl.MostRecentUniqueByYear"/>
<bean class="datawave.query.language.functions.jexl.Geowave.Contains"/>
<bean class="datawave.query.language.functions.jexl.Geowave.CoveredBy"/>
<bean class="datawave.query.language.functions.jexl.Geowave.Covers"/>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
<version.datawave.metrics-reporter>3.0.0</version.datawave.metrics-reporter>
<version.datawave.query-api>1.0.0</version.datawave.query-api>
<version.datawave.query-metric-api>4.0.7</version.datawave.query-metric-api>
<version.datawave.type-utils>3.0.3</version.datawave.type-utils>
<version.datawave.type-utils>3.1.2</version.datawave.type-utils>
<version.deltaspike>1.9.0</version.deltaspike>
<version.easymock>5.2.0</version.easymock>
<version.eclipse.emf>2.15.0</version.eclipse.emf>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,11 @@ protected void setupRowBasedHdfsBackedSet(String row) throws IOException {
this.createdRowDir = false;
}

this.set = new HdfsBackedSortedSet<>(null, hdfsBackedSetBufferSize, ivaratorCacheDirs, row, maxOpenFiles, numRetries, persistOptions,
new FileKeySortedSet.Factory());
// noinspection unchecked
this.set = (HdfsBackedSortedSet<Key>) HdfsBackedSortedSet.builder().withBufferPersistThreshold(hdfsBackedSetBufferSize)
.withIvaratorCacheDirs(ivaratorCacheDirs).withUniqueSubPath(row).withMaxOpenFiles(maxOpenFiles).withNumRetries(numRetries)
.withPersistOptions(persistOptions).withSetFactory(new FileKeySortedSet.Factory()).build();

this.threadSafeSet = Collections.synchronizedSortedSet(this.set);
this.currentRow = row;
this.setControl.takeOwnership(row, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ public class FileSystemCache {

public FileSystemCache(String hdfsSiteConfigs) throws MalformedURLException {
conf = new Configuration();
for (String url : org.apache.commons.lang.StringUtils.split(hdfsSiteConfigs, ',')) {
conf.addResource(new URL(url));
if (hdfsSiteConfigs != null) {
for (String url : org.apache.commons.lang.StringUtils.split(hdfsSiteConfigs, ',')) {
conf.addResource(new URL(url));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class QueryParameters {

public static final String GROUP_FIELDS_BATCH_SIZE = "group.fields.batch.size";
public static final String UNIQUE_FIELDS = "unique.fields";
public static final String MOST_RECENT_UNIQUE = "most.recent.unique";

/**
* Used to specify fields which are excluded from QueryModel expansion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
Expand Down Expand Up @@ -228,6 +229,20 @@ public boolean isFromIndex() {
return fromIndex;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Attribute)) {
return false;
}
Attribute other = (Attribute) o;
EqualsBuilder equals = new EqualsBuilder().append(this.isMetadataSet(), other.isMetadataSet());
if (this.isMetadataSet()) {
equals.append(this.getMetadata(), other.getMetadata());
}
return equals.isEquals();
}

@Override
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder(145, 11);
hcb.append(this.isMetadataSet());
Expand Down
Loading

0 comments on commit 7f309fd

Please sign in to comment.