Skip to content

Commit

Permalink
CachedFieldHelper revised diagnostic and additional unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dtspence committed Jan 23, 2025
1 parent f2ac332 commit 6e7cbc5
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,70 +1,65 @@
package datawave.ingest.data.config;

import static java.lang.Thread.NORM_PRIORITY;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class CachedFieldConfigHelper implements FieldConfigHelper {
private final static Logger log = LoggerFactory.getLogger(CachedFieldConfigHelper.class);

private final static float DEFAULT_LRU_LF = 0.75f;
private final static int DEFAULT_DEBUG_STATE_SECS = 30;
private final static int DEFAULT_DIAGNOSTIC_SECS = 30;

private final FieldConfigHelper underlyingHelper;
private final LruCache<String,CachedEntry> resultCache;
private final boolean debugLimitsEnabled;
private final int limit;
private final Set<String> debugFieldUnique;
private final ScheduledExecutorService debugStateExecutor;
private final AtomicLong debugFieldComputes;
private final boolean diagnosticEnabled;
private final Set<String> diagnosticUniqueFields;
private Clock clock;
private boolean limitMessageEmitted;
private long diagnosticFieldCompute;
private long diagnosticEmitIntervalMillis;
private long diagnosticEmitNextMillis;
private boolean diagnosticEmitted;

enum AttributeType {
INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD
}

interface Clock {
default long epochMillis() {
return System.currentTimeMillis();
}

static Clock defaultClock() {
return new Clock() {};
}
}

public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
this(helper, limit, false);
}

public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean debugLimitEnabled) {
public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean diagnosticEnabled) {
if (limit < 1) {
throw new IllegalArgumentException("Limit must be a positive integer");
}
this.clock = Clock.defaultClock();
this.underlyingHelper = helper;
this.resultCache = new LruCache<>(limit);
this.limit = limit;
this.debugLimitsEnabled = debugLimitEnabled;
this.debugFieldUnique = new HashSet<>();
this.debugFieldComputes = new AtomicLong();

if (debugLimitEnabled) {
this.debugStateExecutor = Executors.newSingleThreadScheduledExecutor(
// @formatter:off
new ThreadFactoryBuilder()
.setPriority(NORM_PRIORITY)
.setDaemon(true)
.setNameFormat("CachedFieldConfigHelper.DebugState")
.build()
// formatter:off
);
this.debugStateExecutor.scheduleAtFixedRate(this::debugLogState, DEFAULT_DEBUG_STATE_SECS, DEFAULT_DEBUG_STATE_SECS, SECONDS);
} else {
this.debugStateExecutor = null;
}
this.diagnosticEnabled = diagnosticEnabled;
this.diagnosticUniqueFields = new HashSet<>();
this.diagnosticEmitIntervalMillis = SECONDS.toMillis(DEFAULT_DIAGNOSTIC_SECS);
}

@Override
Expand Down Expand Up @@ -99,41 +94,80 @@ public boolean isReverseTokenizedField(String fieldName) {

@VisibleForTesting
boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate<String> fn) {
CachedEntry ce = !debugLimitsEnabled ?
resultCache.computeIfAbsent(fieldName, CachedEntry::new) :
resultCache.computeIfAbsent(fieldName, this::debugCachedEntryCreation);
return ce.get(attributeType).getResultOrEvaluate(fn);
CachedEntry cachedEntry = resultCache.computeIfAbsent(fieldName, (key) -> {
if (diagnosticEnabled) {
diagnosticFieldCompute++;
diagnosticUniqueFields.add(key);
}
return new CachedEntry(key);
});

CachedEntry.MemoizedResult memoizedResult = cachedEntry.get(attributeType);

// when trace state is enabled - emit a message if the field limit has been exceeded
// the intent is to help adjust the size required for the cache
if (diagnosticEnabled && clock.epochMillis() > diagnosticEmitNextMillis) {
diagnosticEmitted = true;
diagnosticEmitNextMillis = clock.epochMillis() + diagnosticEmitIntervalMillis;
log.info("Field cache LRU [limit={}, computed={}, size={}, uniq={}]", limit, diagnosticFieldCompute, diagnosticUniqueFields.size(),
diagnosticUniqueFields);
} else if (resultCache.hasLimitExceeded() && !limitMessageEmitted) {
log.info("Field cache LRU limit exceeded: [limit={}, field={}]", limit, fieldName);
limitMessageEmitted = true;
}
return memoizedResult.getResultOrEvaluate(fn);
}

@VisibleForTesting
boolean hasLimitExceeded() {
return resultCache.hasLimitExceeded();
}

private CachedEntry debugCachedEntryCreation(String fieldName) {
debugFieldComputes.incrementAndGet();
debugFieldUnique.add(fieldName);
return new CachedEntry(fieldName);
@VisibleForTesting
Set<String> getCachedFields() {
return resultCache.keySet();
}

private void debugLogState() {
if (resultCache.hasLimitExceeded()) {
log.info("Field cache LRU limit exceeded [limit={}, debug={}, size={}, uniq={}]",
limit, debugFieldComputes.get(), debugFieldUnique.size(), debugLimitsEnabled);
}
@VisibleForTesting
boolean getDiagnosticEmitted() {
return diagnosticEmitted;
}

@VisibleForTesting
Set<String> getDiagnosticUniqueFields() {
return diagnosticUniqueFields;
}

@VisibleForTesting
long getDiagnosticFieldCompute() {
return diagnosticFieldCompute;
}

@VisibleForTesting
long getDiagnosticEmitNextMillis() {
return diagnosticEmitNextMillis;
}

@VisibleForTesting
void setDiagnosticEmitIntervalMillis(long intervalMillis) {
this.diagnosticEmitIntervalMillis = intervalMillis;
}

@VisibleForTesting
void setClock(Clock clock) {
this.clock = clock;
}

private static class LruCache<K,V> extends LinkedHashMap<K,V> {
private final int maxSize;
private volatile boolean limitExceeded;
private boolean limitExceeded;

LruCache(int maxSize) {
super((int)(maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true);
super((int) (maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true);
this.maxSize = maxSize;
}

boolean hasLimitExceeded() {
// thread-safe
return limitExceeded;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C

public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file";
public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled";
public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit";
public static final String FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG = ".data.category.field.config.cache.limit.debug";
public static final String FIELD_CONFIG_CACHE_LIMIT = ".data.category.field.config.cache.limit";
public static final String FIELD_CONFIG_CACHE_DIAGNOSTIC = ".data.category.field.config.cache.diagnostic";

private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class);

Expand Down Expand Up @@ -260,19 +260,19 @@ public void setup(Configuration config) {
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE);
if (fieldConfigFile != null) {
final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, false);
final boolean fieldConfigCacheLimitDebug = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG, false);
final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, 100);
final boolean fieldConfigCacheLimitDiagnostic = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_DIAGNOSTIC, false);
final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_LIMIT, 100);
if (log.isDebugEnabled()) {
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
}
log.info("Field config cache enabled: " + fieldConfigCacheEnabled);
if (fieldConfigCacheEnabled) {
log.info("Field config cache limit: " + fieldConfigCacheLimit);
log.info("Field config cache limit debug: " + fieldConfigCacheLimitDebug);
log.info("Field config cache limit diagnostic: " + fieldConfigCacheLimitDiagnostic);
}
fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
if (fieldConfigCacheEnabled) {
fieldConfigHelper = new CachedFieldConfigHelper(fieldConfigHelper, fieldConfigCacheLimit, fieldConfigCacheLimitDebug);
fieldConfigHelper = new CachedFieldConfigHelper(fieldConfigHelper, fieldConfigCacheLimit, fieldConfigCacheLimitDiagnostic);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datawave.ingest.data.config;

import static datawave.ingest.data.config.CachedFieldConfigHelper.AttributeType.INDEXED_FIELD;
import static datawave.ingest.data.config.CachedFieldConfigHelper.AttributeType.STORED_FIELD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -10,6 +12,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -82,36 +85,66 @@ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
// 3. limit blocks results to return if exceeded
// 4. limit functions across attribute-types

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field1", innerHelper::isStoredField);
assertEquals(1, storedCounter.get(), "field1 should compute result (new field)");
assertEquals(Set.of("field1"), helper.getCachedFields());
assertFalse(helper.hasLimitExceeded());

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field1", innerHelper::isStoredField);
assertEquals(1, storedCounter.get(), "field1 repeated (existing field)");
assertEquals(Set.of("field1"), helper.getCachedFields());
assertFalse(helper.hasLimitExceeded());

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(2, storedCounter.get(), "field2 should compute result (new field)");
assertEquals(Set.of("field1", "field2"), helper.getCachedFields());
assertFalse(helper.hasLimitExceeded());

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(2, storedCounter.get(), "field2 repeated (existing)");
assertEquals(Set.of("field1", "field2"), helper.getCachedFields());
assertFalse(helper.hasLimitExceeded());

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", innerHelper::isIndexedField);
helper.getFieldResult(INDEXED_FIELD, "field1", innerHelper::isIndexedField);
assertEquals(1, indexCounter.get(), "field1 should compute result (new attribute)");
assertEquals(Set.of("field1", "field2"), helper.getCachedFields());
assertFalse(helper.hasLimitExceeded());

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field3", innerHelper::isStoredField);
assertEquals(3, storedCounter.get(), "field3 exceeded limit (new field)");
assertEquals(Set.of("field1", "field3"), helper.getCachedFields());
assertTrue(helper.hasLimitExceeded());

helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field3", innerHelper::isStoredField);
assertEquals(3, storedCounter.get(), "field3 exceeded limit (existing field)");
assertEquals(Set.of("field1", "field3"), helper.getCachedFields());

// LRU map should evict field #2
// we access field #1 above which has more accesses over field #2
helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField);
helper.getFieldResult(STORED_FIELD, "field2", innerHelper::isStoredField);
assertEquals(Set.of("field2", "field3"), helper.getCachedFields());
assertEquals(4, storedCounter.get(), "field1 exceeded limit (new field/eviction)");
}

@Test
public void testCacheHelperDiagnosticMessage() {
CachedFieldConfigHelper.Clock clock = mock(CachedFieldConfigHelper.Clock.class);
FieldConfigHelper innerHelper = mock(FieldConfigHelper.class);
CachedFieldConfigHelper helper = new CachedFieldConfigHelper(innerHelper, 2, true);

// set the clock so that the trace emit
// should fire as soon as the limit is exceeded
// expected below by the very last test-case
when(clock.epochMillis()).thenReturn(1L);
helper.setDiagnosticEmitIntervalMillis(1);
helper.setClock(clock);

assertEquals(0, helper.getDiagnosticEmitNextMillis(), "expected trace interval to be zero");

helper.isStoredField("field1");
assertEquals(1, helper.getDiagnosticFieldCompute());
assertEquals(Set.of("field1"), helper.getDiagnosticUniqueFields(), "field1 uniq fields");
assertTrue(helper.getDiagnosticEmitted(), "field1 should have caused a trace emit");
assertEquals(2, helper.getDiagnosticEmitNextMillis(), "expected trace emit interval to increase");
}
}

0 comments on commit 6e7cbc5

Please sign in to comment.