-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest field configuration helper cache #2614
base: integration
Are you sure you want to change the base?
Changes from 8 commits
7b6568a
ce58f04
935db13
0c4d168
01f12a3
0926492
3d3c425
c999a78
f2ac332
6e7cbc5
254edbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
package datawave.ingest.data.config; | ||
|
||
import static java.lang.Thread.NORM_PRIORITY; | ||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
||
import java.util.HashSet; | ||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Predicate; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
|
||
public class CachedFieldConfigHelper implements FieldConfigHelper { | ||
private final static Logger log = LoggerFactory.getLogger(CachedFieldConfigHelper.class); | ||
|
||
private final static float DEFAULT_LRU_LF = 0.75f; | ||
private final static int DEFAULT_DEBUG_STATE_SECS = 30; | ||
|
||
private final FieldConfigHelper underlyingHelper; | ||
private final LruCache<String,CachedEntry> resultCache; | ||
private final boolean debugLimitsEnabled; | ||
private final int limit; | ||
private final Set<String> debugFieldUnique; | ||
private final ScheduledExecutorService debugStateExecutor; | ||
private final AtomicLong debugFieldComputes; | ||
|
||
enum AttributeType { | ||
INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD | ||
} | ||
|
||
public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) { | ||
this(helper, limit, false); | ||
} | ||
|
||
public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean debugLimitEnabled) { | ||
if (limit < 1) { | ||
throw new IllegalArgumentException("Limit must be a positive integer"); | ||
} | ||
this.underlyingHelper = helper; | ||
this.resultCache = new LruCache<>(limit); | ||
this.limit = limit; | ||
this.debugLimitsEnabled = debugLimitEnabled; | ||
this.debugFieldUnique = new HashSet<>(); | ||
this.debugFieldComputes = new AtomicLong(); | ||
|
||
if (debugLimitEnabled) { | ||
this.debugStateExecutor = Executors.newSingleThreadScheduledExecutor( | ||
// @formatter:off | ||
new ThreadFactoryBuilder() | ||
.setPriority(NORM_PRIORITY) | ||
.setDaemon(true) | ||
.setNameFormat("CachedFieldConfigHelper.DebugState") | ||
.build() | ||
// formatter:off | ||
); | ||
this.debugStateExecutor.scheduleAtFixedRate(this::debugLogState, DEFAULT_DEBUG_STATE_SECS, DEFAULT_DEBUG_STATE_SECS, SECONDS); | ||
} else { | ||
this.debugStateExecutor = null; | ||
} | ||
} | ||
|
||
@Override | ||
public boolean isStoredField(String fieldName) { | ||
return getFieldResult(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField); | ||
} | ||
|
||
@Override | ||
public boolean isIndexedField(String fieldName) { | ||
return getFieldResult(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField); | ||
} | ||
|
||
@Override | ||
public boolean isIndexOnlyField(String fieldName) { | ||
return getFieldResult(AttributeType.INDEX_ONLY_FIELD, fieldName, underlyingHelper::isIndexOnlyField); | ||
} | ||
|
||
@Override | ||
public boolean isReverseIndexedField(String fieldName) { | ||
return getFieldResult(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField); | ||
} | ||
|
||
@Override | ||
public boolean isTokenizedField(String fieldName) { | ||
return getFieldResult(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField); | ||
} | ||
|
||
@Override | ||
public boolean isReverseTokenizedField(String fieldName) { | ||
return getFieldResult(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField); | ||
} | ||
|
||
@VisibleForTesting | ||
boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate<String> fn) { | ||
CachedEntry ce = !debugLimitsEnabled ? | ||
resultCache.computeIfAbsent(fieldName, CachedEntry::new) : | ||
resultCache.computeIfAbsent(fieldName, this::debugCachedEntryCreation); | ||
return ce.get(attributeType).getResultOrEvaluate(fn); | ||
} | ||
|
||
@VisibleForTesting | ||
boolean hasLimitExceeded() { | ||
return resultCache.hasLimitExceeded(); | ||
} | ||
|
||
private CachedEntry debugCachedEntryCreation(String fieldName) { | ||
debugFieldComputes.incrementAndGet(); | ||
debugFieldUnique.add(fieldName); | ||
return new CachedEntry(fieldName); | ||
} | ||
|
||
private void debugLogState() { | ||
if (resultCache.hasLimitExceeded()) { | ||
log.info("Field cache LRU limit exceeded [limit={}, debug={}, size={}, uniq={}]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this feature is "debugLogState" I was expecting this to be debug level logging. Not a deal breaker but I'd vote to make it debug. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can update - I set the level this way so it would show up in logs without having to also adjust logger/levels. |
||
limit, debugFieldComputes.get(), debugFieldUnique.size(), debugLimitsEnabled); | ||
} | ||
} | ||
|
||
private static class LruCache<K,V> extends LinkedHashMap<K,V> { | ||
private final int maxSize; | ||
private volatile boolean limitExceeded; | ||
|
||
LruCache(int maxSize) { | ||
super((int)(maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true); | ||
this.maxSize = maxSize; | ||
} | ||
|
||
boolean hasLimitExceeded() { | ||
// thread-safe | ||
return limitExceeded; | ||
} | ||
|
||
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) { | ||
boolean localLimitExceeded = size() > maxSize; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can make a comment and/or refactor - the intent was to have the limitExceeded variable be thread-safe for the debug log output. EDIT: The refactored the implementation no longer uses a thread for the debug/diagnostic log messages. |
||
if (localLimitExceeded) { | ||
limitExceeded = true; | ||
} | ||
return localLimitExceeded; | ||
} | ||
} | ||
|
||
private static class CachedEntry { | ||
private final String fieldName; | ||
private final MemoizedResult indexed; | ||
private final MemoizedResult reverseIndexed; | ||
private final MemoizedResult stored; | ||
private final MemoizedResult indexedOnly; | ||
private final MemoizedResult tokenized; | ||
private final MemoizedResult reverseTokenized; | ||
|
||
private CachedEntry(String fieldName) { | ||
this.fieldName = fieldName; | ||
this.indexed = new MemoizedResult(); | ||
this.reverseIndexed = new MemoizedResult(); | ||
this.stored = new MemoizedResult(); | ||
this.indexedOnly = new MemoizedResult(); | ||
this.tokenized = new MemoizedResult(); | ||
this.reverseTokenized = new MemoizedResult(); | ||
} | ||
|
||
private MemoizedResult get(AttributeType attributeType) { | ||
MemoizedResult result; | ||
switch (attributeType) { | ||
case INDEX_ONLY_FIELD: | ||
result = indexedOnly; | ||
break; | ||
case INDEXED_FIELD: | ||
result = indexed; | ||
break; | ||
case REVERSE_INDEXED_FIELD: | ||
result = reverseIndexed; | ||
break; | ||
case TOKENIZED_FIELD: | ||
result = tokenized; | ||
break; | ||
case REVERSE_TOKENIZED_FIELD: | ||
result = reverseTokenized; | ||
break; | ||
case STORED_FIELD: | ||
result = stored; | ||
break; | ||
default: | ||
throw new IllegalArgumentException("Undefined attribute type: " + attributeType); | ||
} | ||
return result; | ||
} | ||
|
||
private class MemoizedResult { | ||
private boolean resultEvaluated; | ||
private boolean result; | ||
|
||
private boolean getResultOrEvaluate(Predicate<String> evaluateFn) { | ||
if (!resultEvaluated) { | ||
result = evaluateFn.test(fieldName); | ||
resultEvaluated = true; | ||
} | ||
return result; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
import datawave.ingest.config.IngestConfigurationFactory; | ||
import datawave.ingest.data.Type; | ||
import datawave.ingest.data.TypeRegistry; | ||
import datawave.ingest.data.config.CachedFieldConfigHelper; | ||
import datawave.ingest.data.config.DataTypeHelperImpl; | ||
import datawave.ingest.data.config.FieldConfigHelper; | ||
import datawave.ingest.data.config.MarkingsHelper; | ||
|
@@ -138,6 +139,9 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C | |
public static final String FIELD_FAILED_NORMALIZATION_POLICY = ".data.field.normalization.failure.policy"; | ||
|
||
public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file"; | ||
public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled"; | ||
public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit"; | ||
public static final String FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG = ".data.category.field.config.cache.limit.debug"; | ||
|
||
private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class); | ||
|
||
|
@@ -255,10 +259,19 @@ public void setup(Configuration config) { | |
// Load the field helper, which takes precedence over the individual field configurations | ||
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE); | ||
if (fieldConfigFile != null) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably restore this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was elevated to an info - however, it is logged twice - there is another location within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the new diagnostic code if it makes better sense to have it as info then I'm ok with it. |
||
final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the cache truly is the better option do we really want it to be feature based? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My preference was to make it feature/opted-into based on the need to select an appropriate cache value and also so the capability could be turned off in the event of unexpected behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's reasonable. I suggest that once the confidence level is high, we remove the conditional logic. |
||
final boolean fieldConfigCacheLimitDebug = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG, false); | ||
final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, 100); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how was the default of 100 chosen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used a value that felt was a reasonable/minimal default and also fit with the sample datasets. I can change it to something different, would a higher value at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a recommendation on size. I was just curious. |
||
log.info("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE); | ||
log.info("Field config cache enabled: " + fieldConfigCacheEnabled); | ||
if (fieldConfigCacheEnabled) { | ||
log.info("Field config cache limit: " + fieldConfigCacheLimit); | ||
log.info("Field config cache limit debug: " + fieldConfigCacheLimitDebug); | ||
} | ||
fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this); | ||
if (fieldConfigCacheEnabled) { | ||
fieldConfigHelper = new CachedFieldConfigHelper(fieldConfigHelper, fieldConfigCacheLimit, fieldConfigCacheLimitDebug); | ||
} | ||
this.fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this); | ||
} | ||
|
||
// Process the indexed fields | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package datawave.ingest.data.config; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.EnumSource; | ||
import org.junit.jupiter.params.provider.ValueSource; | ||
|
||
public class CachedFieldConfigHelperTest { | ||
@Test | ||
public void testCachingBehaviorWillCallBaseMethods() { | ||
String fieldName = "test"; | ||
FieldConfigHelper mockHelper = mock(FieldConfigHelper.class); | ||
FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1); | ||
|
||
cachedHelper.isIndexOnlyField(fieldName); | ||
verify(mockHelper).isIndexOnlyField(eq(fieldName)); | ||
|
||
cachedHelper.isIndexedField(fieldName); | ||
verify(mockHelper).isIndexedField(eq(fieldName)); | ||
|
||
cachedHelper.isTokenizedField(fieldName); | ||
verify(mockHelper).isTokenizedField(eq(fieldName)); | ||
|
||
cachedHelper.isStoredField(fieldName); | ||
verify(mockHelper).isStoredField(eq(fieldName)); | ||
|
||
cachedHelper.isReverseIndexedField(fieldName); | ||
verify(mockHelper).isReverseIndexedField(eq(fieldName)); | ||
|
||
cachedHelper.isReverseTokenizedField(fieldName); | ||
verify(mockHelper).isReverseTokenizedField(eq(fieldName)); | ||
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(ints = {-1, 0}) | ||
public void testConstructorWithNonPositiveLimitWillThrow(int limit) { | ||
assertThrows(IllegalArgumentException.class, () -> new CachedFieldConfigHelper(mock(FieldConfigHelper.class), limit)); | ||
} | ||
|
||
@SuppressWarnings("ClassEscapesDefinedScope") | ||
@ParameterizedTest | ||
@EnumSource(CachedFieldConfigHelper.AttributeType.class) | ||
public void testAttributeTypesDoNotThrow(CachedFieldConfigHelper.AttributeType attributeType) { | ||
String fieldName = "test"; | ||
FieldConfigHelper mockHelper = mock(FieldConfigHelper.class); | ||
CachedFieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1); | ||
cachedHelper.getFieldResult(attributeType, fieldName, (f) -> true); | ||
} | ||
|
||
@Test | ||
public void testCachingLimitsBetweenFieldsAndAttributeTypes() { | ||
AtomicLong storedCounter = new AtomicLong(); | ||
AtomicLong indexCounter = new AtomicLong(); | ||
FieldConfigHelper innerHelper = mock(FieldConfigHelper.class); | ||
CachedFieldConfigHelper helper = new CachedFieldConfigHelper(innerHelper, 2, true); | ||
|
||
when(innerHelper.isStoredField(any())).then((a) -> { | ||
storedCounter.incrementAndGet(); | ||
return true; | ||
}); | ||
|
||
when(innerHelper.isIndexedField(any())).then((a) -> { | ||
indexCounter.incrementAndGet(); | ||
return true; | ||
}); | ||
|
||
// following ensures that: | ||
// 1. fields are computed, where appropriate per attribute-type | ||
// 2. limit allows cache results to return | ||
// 3. limit blocks results to return if exceeded | ||
// 4. limit functions across attribute-types | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField); | ||
assertEquals(1, storedCounter.get(), "field1 should compute result (new field)"); | ||
assertFalse(helper.hasLimitExceeded()); | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField); | ||
assertEquals(1, storedCounter.get(), "field1 repeated (existing field)"); | ||
assertFalse(helper.hasLimitExceeded()); | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField); | ||
assertEquals(2, storedCounter.get(), "field2 should compute result (new field)"); | ||
assertFalse(helper.hasLimitExceeded()); | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField); | ||
assertEquals(2, storedCounter.get(), "field2 repeated (existing)"); | ||
assertFalse(helper.hasLimitExceeded()); | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", innerHelper::isIndexedField); | ||
assertEquals(1, indexCounter.get(), "field1 should compute result (new attribute)"); | ||
assertFalse(helper.hasLimitExceeded()); | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField); | ||
assertEquals(3, storedCounter.get(), "field3 exceeded limit (new field)"); | ||
assertTrue(helper.hasLimitExceeded()); | ||
|
||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField); | ||
assertEquals(3, storedCounter.get(), "field3 exceeded limit (existing field)"); | ||
|
||
// LRU map should evict field #2 | ||
// we access field #1 above which has more accesses over field #2 | ||
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField); | ||
assertEquals(4, storedCounter.get(), "field1 exceeded limit (new field/eviction)"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this debug related code be left in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can update the name to be something different and/or refactor. The intent was to trace internals to view and help properly size the cache. It was meant to be conditionally enabled, so not to impact normal execution. I evaluated a way to link to Hadoop metrics, but I didn't immediately see a straight forward path to connect. In respect to the name, would generally changing this to an optional
view-state
semantic be better or is there another convention that would apply better?EDIT: we could also update tracing/state logic to all be inline (i.e. remove threading), which when enabled would optionally log out the same information. That would simplify the maintenance/logic.
EDIT#2: refactored the logic to no longer have a background thread to log the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diagnostic nomenclature works for me.