Skip to content

Commit

Permalink
Feature/composite timestamp (#2187)
Browse files Browse the repository at this point in the history
* The composite timestamp allows us to encode two values in the timestamp to be used in accumulo keys.
  The event date will take the first (right hand most) 46 bits. The last 17 bits (except for the sign bit) will be
  used to encode how many days after the event date that we should base the ageoff on. If the timestamp
  is negative, then to calculate the values the complement is taken and then the two values are extracted.
  The ageoff is encoded as an age off delta which is the number of days after the event date.
* Added composite timestamp tests
* Add get and set Timestamp on the record container.
* Refined the composite timestamp to check for bounds (#2268)
* Incorporated timestamp concept into the RawRecordContainer (#2297)
* Override getDate to be compatible with extending classes
* Updated sharded ingest to appropriately use getTimestamp vs getDate
* Updated index table to use composite timestamp
* Updated edge table to use composite timestamp
* Added an explicit INVALID_TIMESTAMP instead of using Long.MIN_VALUE to determine if the date is not set
* Updated the global index uid aggregator to not aggregate across different timestamps
* Removed the "timestampIgnored" flag on the global index uid aggregator as we never turn that off and we don't want to
* Added test to ensure that keys with different timestamps do not get aggregated
* Updated ageoff day delta calculation to be based on the difference between the start-of-day values for the event and age off dates.

---------

Co-authored-by: hlgp <hlgp@users.noreply.github.com>
Co-authored-by: Ivan Bella <347158+ivakegg@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent 5ac2f44 commit 6cae3b3
Show file tree
Hide file tree
Showing 55 changed files with 1,097 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ function datawaveIngestWikipedia() {
[ ! -f "${wikipediaRawFile}" ] && error "File not found: ${wikipediaRawFile}" && return 1

local wikipediaHdfsFile="${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/$( basename ${wikipediaRawFile} )"
local putFileCommand="hdfs dfs -copyFromLocal ${wikipediaRawFile} ${wikipediaHdfsFile}"
local putFileCommand="hdfs dfs -copyFromLocal -f ${wikipediaRawFile} ${wikipediaHdfsFile}"

local inputFormat="datawave.ingest.wikipedia.WikipediaEventInputFormat"
local jobCommand="${DW_DATAWAVE_INGEST_HOME}/bin/ingest/live-ingest.sh ${wikipediaHdfsFile} ${DW_DATAWAVE_INGEST_NUM_SHARDS} -inputFormat ${inputFormat} -data.name.override=wikipedia ${extraOpts}"
Expand All @@ -211,7 +211,7 @@ function datawaveIngestCsv() {
[ ! -f "${csvRawFile}" ] && error "File not found: ${csvRawFile}" && return 1

local csvHdfsFile="${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/$( basename ${csvRawFile} )"
local putFileCommand="hdfs dfs -copyFromLocal ${csvRawFile} ${csvHdfsFile}"
local putFileCommand="hdfs dfs -copyFromLocal -f ${csvRawFile} ${csvHdfsFile}"

local inputFormat="datawave.ingest.csv.mr.input.CSVFileInputFormat"
local jobCommand="${DW_DATAWAVE_INGEST_HOME}/bin/ingest/live-ingest.sh ${csvHdfsFile} ${DW_DATAWAVE_INGEST_NUM_SHARDS} -inputFormat ${inputFormat} -data.name.override=mycsv ${extraOpts}"
Expand All @@ -232,7 +232,7 @@ function datawaveIngestJson() {
[ ! -f "${jsonRawFile}" ] && error "File not found: ${jsonRawFile}" && return 1

local jsonHdfsFile="${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/$( basename ${jsonRawFile} )"
local putFileCommand="hdfs dfs -copyFromLocal ${jsonRawFile} ${jsonHdfsFile}"
local putFileCommand="hdfs dfs -copyFromLocal -f ${jsonRawFile} ${jsonHdfsFile}"

local inputFormat="datawave.ingest.json.mr.input.JsonInputFormat"
local jobCommand="${DW_DATAWAVE_INGEST_HOME}/bin/ingest/live-ingest.sh ${jsonHdfsFile} ${DW_DATAWAVE_INGEST_NUM_SHARDS} -inputFormat ${inputFormat} -data.name.override=myjson ${extraOpts}"
Expand Down Expand Up @@ -347,3 +347,10 @@ function datawaveIngestTarballName() {
local dwVersion="$(getDataWaveVersion)"
echo "$( basename "${DW_DATAWAVE_INGEST_TARBALL/-\*-/-$dwVersion-}" )"
}

function datawaveIngestExamples() {
datawaveIngestWikipedia ${DW_DATAWAVE_INGEST_TEST_FILE_WIKI}
datawaveIngestJson ${DW_DATAWAVE_INGEST_TEST_FILE_JSON}
datawaveIngestCsv ${DW_DATAWAVE_INGEST_TEST_FILE_CSV}
}

Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ function initializeDatawaveTables() {
fi
}

function ingestExampleData() {
# Ingest some canned, example data files
datawaveIngestWikipedia "${DW_DATAWAVE_INGEST_TEST_FILE_WIKI}"
datawaveIngestJson "${DW_DATAWAVE_INGEST_TEST_FILE_JSON}"
datawaveIngestCsv "${DW_DATAWAVE_INGEST_TEST_FILE_CSV}"
}


initializeDatawaveTables

Expand All @@ -186,4 +179,4 @@ info "See \$DW_CLOUD_HOME/bin/services/datawave/bootstrap-ingest.sh to view/edit

# Ingest raw data examples, if appropriate...

[ "${DW_REDEPLOY_IN_PROGRESS}" != true ] && [ "${DW_DATAWAVE_INGEST_TEST_SKIP}" == false ] && ingestExampleData
[ "${DW_REDEPLOY_IN_PROGRESS}" != true ] && [ "${DW_DATAWAVE_INGEST_TEST_SKIP}" == false ] && datawaveIngestExamples
6 changes: 3 additions & 3 deletions contrib/datawave-quickstart/bin/services/hadoop/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ dfs.replication 1"

DW_HADOOP_MR_HEAPDUMP_DIR="${DW_CLOUD_DATA}/heapdumps"
# mapred-site.xml (Format: <property-name><space><property-value>{<newline>})
DW_HADOOP_MAPRED_SITE_CONF="mapreduce.jobhistory.address http://${DW_BIND_HOST}:8020
mapreduce.jobhistory.webapp.address http://${DW_BIND_HOST}:8021
DW_HADOOP_MAPRED_SITE_CONF="mapreduce.jobhistory.address ${DW_BIND_HOST}:8020
mapreduce.jobhistory.webapp.address ${DW_BIND_HOST}:8021
mapreduce.jobhistory.intermediate-done-dir ${DW_HADOOP_MR_INTER_DIR}
mapreduce.jobhistory.done-dir ${DW_HADOOP_MR_DONE_DIR}
mapreduce.map.memory.mb 2048
Expand All @@ -72,7 +72,7 @@ yarn.nodemanager.pmem-check-enabled false
yarn.nodemanager.vmem-check-enabled false
yarn.nodemanager.resource.memory-mb 6144
yarn.app.mapreduce.am.resource.mb 1024
yarn.log.server.url http://localhost:8070/jobhistory/logs"
yarn.log.server.url http://localhost:8021/jobhistory/logs"

# capacity-scheduler.xml (Format: <property-name><space><property-value>{<newline>})
DW_HADOOP_CAPACITY_SCHEDULER_CONF="yarn.scheduler.capacity.maximum-applications 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import datawave.ingest.util.cache.watch.FileRuleCacheValue;
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterRule;
import datawave.util.CompositeTimestamp;

/**
* This class provides a subclass of the {@code org.apache.accumulo.core.iterators.Filter} class and implements the {@code Option Describer} interface. It
Expand Down Expand Up @@ -167,7 +169,7 @@ public boolean accept(Key k, Value v) {
return true;

// short circuit check
long timeStamp = k.getTimestamp();
long timeStamp = CompositeTimestamp.getAgeOffDate(k.getTimestamp());
if (timeStamp > this.shortCircuitDateMillis)
return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;

/**
* This class provides an abstract base class to be extended to filter based on matching a REGEX to the {@code String} object that represents some portion of a
Expand Down Expand Up @@ -69,7 +70,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
String keyField = getKeyField(k, v);
Matcher matcher = pattern.matcher(keyField);
if (matcher.find()) {
long timeStamp = k.getTimestamp();
long timeStamp = CompositeTimestamp.getAgeOffDate(k.getTimestamp());
dtFlag = timeStamp > period.getCutOffMilliseconds();
if (log.isTraceEnabled()) {
log.trace("timeStamp = " + timeStamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;
import datawave.util.StringUtils;

/**
Expand Down Expand Up @@ -69,7 +70,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
dtFlag = true;
} else {
if (hasToken(k, v, patternBytes)) {
long timeStamp = k.getTimestamp();
long timeStamp = CompositeTimestamp.getAgeOffDate(k.getTimestamp());
dtFlag = timeStamp > period.getCutOffMilliseconds();
if (log.isTraceEnabled()) {
log.trace("timeStamp = " + timeStamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;

/**
* TokenizingAgeoffFilter cuts a field into tokens (splitting at a specified set of delimiters), and makes ageoff decisions based on whether or not any of the
Expand Down Expand Up @@ -119,7 +120,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value V) {
cutoffTimestamp -= calculatedTTL;
}
ruleApplied = true;
return k.getTimestamp() > cutoffTimestamp;
return CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > cutoffTimestamp;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.Sets;

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.util.CompositeTimestamp;

/**
* Data type age off filter. Traverses through indexed tables
Expand Down Expand Up @@ -209,11 +210,11 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
if (dataTypeCutoff == null) {
if (defaultCutoffTime >= 0) {
ruleApplied = true;
accept = k.getTimestamp() > defaultCutoffTime;
accept = CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > defaultCutoffTime;
}
} else {
ruleApplied = true;
accept = k.getTimestamp() > dataTypeCutoff;
accept = CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > dataTypeCutoff;
}
// after age-off is applied check, if we are accepting this KeyValue and this is a Scan on a dataType which only accepts on timestamp
// only continue to accept the KeyValue if the timestamp for the dataType matches what is configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.iterators.filter.ColumnVisibilityOrFilter;
import datawave.util.CompositeTimestamp;

/**
* Field age off filter. Traverses through indexed tables and non-indexed tables. Example follows. Note that any field TTL will follow the same units specified
Expand Down Expand Up @@ -208,7 +209,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
Long dataTypeCutoff = (fieldTimes.containsKey(field)) ? fieldTimes.get(field) : null;
if (dataTypeCutoff != null) {
ruleApplied = true;
return k.getTimestamp() > dataTypeCutoff;
return CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > dataTypeCutoff;
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.log4j.Logger;

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.util.CompositeTimestamp;

/**
* Data type age off filter. Traverses through indexed tables
Expand Down Expand Up @@ -59,7 +60,11 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {

// this rule determines whether to accept / deny (ageoff) a K/V
// based solely on whether a timestamp is before (older than) the cutoff for aging off
return k.getTimestamp() > period.getCutOffMilliseconds();
if (CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > period.getCutOffMilliseconds()) {
return true;
} else {
return false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;
import datawave.util.StringUtils;

/**
Expand Down Expand Up @@ -108,7 +109,7 @@ public boolean accept(AgeOffPeriod ageOffPeriod, Key k, Value V) {
cutOff -= timeToLive;
}
this.filterRuleApplied = true;
return k.getTimestamp() > cutOff;
return CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > cutOff;
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import datawave.iterators.filter.ageoff.ConfigurableIteratorEnvironment;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.query.iterator.SortedListKeyValueIterator;
import datawave.util.CompositeTimestamp;

public class ConfigurableAgeOffFilterTest {

Expand Down Expand Up @@ -138,6 +139,9 @@ public void testAcceptKeyValue_TtlSet() throws Exception {
// copy cofigs to actual filter we are testing
filter.initialize(wrapper);

long tomorrow = System.currentTimeMillis() + CompositeTimestamp.MILLIS_PER_DAY;
long compositeTS = CompositeTimestamp.getCompositeTimeStamp(daysAgo(365), tomorrow);

// brand new key should be good
assertThat(filter.accept(new Key(), VALUE), is(true));
// first five will hit the ttl short circuit
Expand All @@ -155,6 +159,8 @@ public void testAcceptKeyValue_TtlSet() throws Exception {
assertThat(filter.accept(getKey("foo", daysAgo(8)), VALUE), is(true));
// this is really old and matches so should not be accepted
assertThat(filter.accept(getKey("foo", daysAgo(365)), VALUE), is(false));
// this is really old and matches, but has a future age off date, so should be accepted
assertThat(filter.accept(getKey("foo", compositeTS), VALUE), is(true));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.iterators.filter.AgeOffTtlUnits;
import datawave.util.CompositeTimestamp;

public class FieldAgeOffFilterTest {
private static final String VISIBILITY_PATTERN = "MY_VIS";
Expand Down Expand Up @@ -84,6 +85,7 @@ public void testIndexTrueUsesDefaultWhenFieldLacksTtl() {
Key key = new Key("1234", "field_z\\x00my-uuid", "field_z\u0000value", VISIBILITY_PATTERN, tenSecondsAgo);
Assert.assertFalse(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());

key = new Key("1234", "field_y", "field_y\u0000value", VISIBILITY_PATTERN, tenSecondsAgo);
Assert.assertFalse(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());
Expand Down Expand Up @@ -236,6 +238,39 @@ public void testIgnoresDocument() {
Assert.assertFalse(ageOffFilter.isFilterRuleApplied());
}

@Test
public void testCompositeTimestamp() {
EditableAccumuloConfiguration conf = new EditableAccumuloConfiguration(DefaultConfiguration.getInstance());
conf.put("table.custom.isindextable", "true");
iterEnv.setConf(conf);

long tenSecondsAgo = System.currentTimeMillis() - (10L * ONE_SEC);
long tomorrow = System.currentTimeMillis() + CompositeTimestamp.MILLIS_PER_DAY;

long compositeTS = CompositeTimestamp.getCompositeTimeStamp(tenSecondsAgo, tomorrow);

FieldAgeOffFilter ageOffFilter = new FieldAgeOffFilter();
FilterOptions filterOptions = createFilterOptionsWithPattern();
// set the default to 5 seconds
filterOptions.setTTL(5L);
filterOptions.setTTLUnits(AgeOffTtlUnits.SECONDS);
// set up ttls for field_y and field_z only, deliberately exclude the ttl for field_y
filterOptions.setOption("fields", "field_y");
filterOptions.setOption("field_y.ttl", "2"); // 2 seconds
ageOffFilter.init(filterOptions, iterEnv);

// age off date allows this to accept
Key key = new Key("1234", "field_y", "field_y\u0000value", VISIBILITY_PATTERN, compositeTS);
Assert.assertTrue(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());

// vanilla date does not
key = new Key("1234", "field_y", "field_y\u0000value", VISIBILITY_PATTERN, tenSecondsAgo);
Assert.assertFalse(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());

}

@Test
public void testKeepsMatchBeforeTtl() {
long oneSecondAgo = System.currentTimeMillis() - (1 * ONE_SEC);
Expand Down
Loading

0 comments on commit 6cae3b3

Please sign in to comment.