Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest field configuration helper cache #2614

Draft
wants to merge 11 commits into
base: integration
Choose a base branch
from
15 changes: 15 additions & 0 deletions warehouse/ingest-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,21 @@
<artifactId>javassist</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package datawave.ingest.data.config;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public class CachedFieldConfigHelper implements FieldConfigHelper {
private final static Logger log = LoggerFactory.getLogger(CachedFieldConfigHelper.class);

private final static float DEFAULT_LRU_LF = 0.75f;
private final static int DEFAULT_DIAGNOSTIC_SECS = 30;

private final FieldConfigHelper underlyingHelper;
private final LruCache<String,CachedEntry> resultCache;
private final int limit;
private final boolean diagnosticEnabled;
private final Set<String> diagnosticUniqueFields;
private Clock clock;
private boolean limitMessageEmitted;
private long diagnosticFieldCompute;
private long diagnosticEmitIntervalMillis;
private long diagnosticEmitNextMillis;
private boolean diagnosticEmitted;

enum AttributeType {
INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD
}

interface Clock {
default long epochMillis() {
return System.currentTimeMillis();
}

static Clock defaultClock() {
return new Clock() {};
}
}

public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
this(helper, limit, false);
}

public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean diagnosticEnabled) {
if (limit < 1) {
throw new IllegalArgumentException("Limit must be a positive integer");
}
this.clock = Clock.defaultClock();
this.underlyingHelper = helper;
this.resultCache = new LruCache<>(limit);
this.limit = limit;
this.diagnosticEnabled = diagnosticEnabled;
this.diagnosticUniqueFields = new HashSet<>();
this.diagnosticEmitIntervalMillis = SECONDS.toMillis(DEFAULT_DIAGNOSTIC_SECS);
}

@Override
public boolean isStoredField(String fieldName) {
return getFieldResult(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField);
}

@Override
public boolean isIndexedField(String fieldName) {
return getFieldResult(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField);
}

@Override
public boolean isIndexOnlyField(String fieldName) {
return getFieldResult(AttributeType.INDEX_ONLY_FIELD, fieldName, underlyingHelper::isIndexOnlyField);
}

@Override
public boolean isReverseIndexedField(String fieldName) {
return getFieldResult(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField);
}

@Override
public boolean isTokenizedField(String fieldName) {
return getFieldResult(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField);
}

@Override
public boolean isReverseTokenizedField(String fieldName) {
return getFieldResult(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField);
}

@VisibleForTesting
boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate<String> fn) {
CachedEntry cachedEntry = resultCache.computeIfAbsent(fieldName, (key) -> {
if (diagnosticEnabled) {
diagnosticFieldCompute++;
diagnosticUniqueFields.add(key);
}
return new CachedEntry(key);
});

CachedEntry.MemoizedResult memoizedResult = cachedEntry.get(attributeType);

// when diagnostic is enabled - emit a message if the field limit has been exceeded
// the intent is to help adjust the size required for the cache
if (diagnosticEnabled && clock.epochMillis() > diagnosticEmitNextMillis) {
diagnosticEmitted = true;
diagnosticEmitNextMillis = clock.epochMillis() + diagnosticEmitIntervalMillis;
log.info("Field cache LRU [limit={}, computed={}, size={}, uniq={}]", limit, diagnosticFieldCompute, diagnosticUniqueFields.size(),
diagnosticUniqueFields);
} else if (resultCache.hasLimitExceeded() && !limitMessageEmitted) {
log.info("Field cache LRU limit exceeded: [limit={}, field={}]", limit, fieldName);
limitMessageEmitted = true;
}
return memoizedResult.getResultOrEvaluate(fn);
}

@VisibleForTesting
boolean hasLimitExceeded() {
return resultCache.hasLimitExceeded();
}

@VisibleForTesting
Set<String> getCachedFields() {
return resultCache.keySet();
}

@VisibleForTesting
boolean getDiagnosticEmitted() {
return diagnosticEmitted;
}

@VisibleForTesting
Set<String> getDiagnosticUniqueFields() {
return diagnosticUniqueFields;
}

@VisibleForTesting
long getDiagnosticFieldCompute() {
return diagnosticFieldCompute;
}

@VisibleForTesting
long getDiagnosticEmitNextMillis() {
return diagnosticEmitNextMillis;
}

@VisibleForTesting
void setDiagnosticEmitIntervalMillis(long intervalMillis) {
this.diagnosticEmitIntervalMillis = intervalMillis;
}

@VisibleForTesting
void setClock(Clock clock) {
this.clock = clock;
}

private static class LruCache<K,V> extends LinkedHashMap<K,V> {
private final int maxSize;
private boolean limitExceeded;

LruCache(int maxSize) {
super((int) (maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true);
this.maxSize = maxSize;
}

boolean hasLimitExceeded() {
return limitExceeded;
}

protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
boolean localLimitExceeded = size() > maxSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is size() threadsafe? Wondering if this could return a stale size in a multi-threaded scenario. Not sure how much we care.

Copy link
Collaborator Author

@dtspence dtspence Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make a comment and/or refactor - the intent was to have the limitExceeded variable be thread-safe for the debug log output.

EDIT: The refactored the implementation no longer uses a thread for the debug/diagnostic log messages.

if (localLimitExceeded) {
limitExceeded = true;
}
return localLimitExceeded;
}
}

private static class CachedEntry {
private final String fieldName;
private final MemoizedResult indexed;
private final MemoizedResult reverseIndexed;
private final MemoizedResult stored;
private final MemoizedResult indexedOnly;
private final MemoizedResult tokenized;
private final MemoizedResult reverseTokenized;

private CachedEntry(String fieldName) {
this.fieldName = fieldName;
this.indexed = new MemoizedResult();
this.reverseIndexed = new MemoizedResult();
this.stored = new MemoizedResult();
this.indexedOnly = new MemoizedResult();
this.tokenized = new MemoizedResult();
this.reverseTokenized = new MemoizedResult();
}

private MemoizedResult get(AttributeType attributeType) {
MemoizedResult result;
switch (attributeType) {
case INDEX_ONLY_FIELD:
result = indexedOnly;
break;
case INDEXED_FIELD:
result = indexed;
break;
case REVERSE_INDEXED_FIELD:
result = reverseIndexed;
break;
case TOKENIZED_FIELD:
result = tokenized;
break;
case REVERSE_TOKENIZED_FIELD:
result = reverseTokenized;
break;
case STORED_FIELD:
result = stored;
break;
default:
throw new IllegalArgumentException("Undefined attribute type: " + attributeType);
}
return result;
}

private class MemoizedResult {
private boolean resultEvaluated;
private boolean result;

private boolean getResultOrEvaluate(Predicate<String> evaluateFn) {
if (!resultEvaluated) {
result = evaluateFn.test(fieldName);
resultEvaluated = true;
}
return result;
}
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setup() method is already too long. There are blocks of logic preceded by a comment explaining what's to come. I know this is a bit out of scope but it would help with readability if it were refactored so that each of those commented blocks turn into meaningfully named methods.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.data.Type;
import datawave.ingest.data.TypeRegistry;
import datawave.ingest.data.config.CachedFieldConfigHelper;
import datawave.ingest.data.config.DataTypeHelperImpl;
import datawave.ingest.data.config.FieldConfigHelper;
import datawave.ingest.data.config.MarkingsHelper;
Expand Down Expand Up @@ -138,6 +139,9 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C
public static final String FIELD_FAILED_NORMALIZATION_POLICY = ".data.field.normalization.failure.policy";

public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file";
public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled";
public static final String FIELD_CONFIG_CACHE_LIMIT = ".data.category.field.config.cache.limit";
public static final String FIELD_CONFIG_CACHE_DIAGNOSTIC = ".data.category.field.config.cache.diagnostic";

private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class);

Expand Down Expand Up @@ -255,10 +259,21 @@ public void setup(Configuration config) {
// Load the field helper, which takes precedence over the individual field configurations
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE);
if (fieldConfigFile != null) {
final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cache truly is the better option do we really want it to be feature based?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference was to make it feature/opted-into based on the need to select an appropriate cache value and also so the capability could be turned off in the event of unexpected behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's reasonable. I suggest that once the confidence level is high, we remove the conditional logic.

final boolean fieldConfigCacheLimitDiagnostic = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_DIAGNOSTIC, false);
final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_LIMIT, 100);
if (log.isDebugEnabled()) {
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
}
this.fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
log.info("Field config cache enabled: " + fieldConfigCacheEnabled);
if (fieldConfigCacheEnabled) {
log.info("Field config cache limit: " + fieldConfigCacheLimit);
log.info("Field config cache limit diagnostic: " + fieldConfigCacheLimitDiagnostic);
}
fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
if (fieldConfigCacheEnabled) {
fieldConfigHelper = new CachedFieldConfigHelper(fieldConfigHelper, fieldConfigCacheLimit, fieldConfigCacheLimitDiagnostic);
}
}

// Process the indexed fields
Expand Down
Loading
Loading