Skip to content

Commit

Permalink
Reworked index service
Browse files Browse the repository at this point in the history
  • Loading branch information
EinsamHauer committed Feb 18, 2022
1 parent efa371e commit 12a821a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>net.iponweb.disthene</groupId>
<artifactId>disthene</artifactId>
<packaging>jar</packaging>
<version>1.0.12</version>
<version>1.0.13</version>
<name>disthene</name>
<url>http://maven.apache.org</url>
<dependencies>
Expand Down
50 changes: 21 additions & 29 deletions src/main/java/net/iponweb/disthene/service/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,26 @@

import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author Andrei Ivanov
*/
@Listener(references= References.Strong)
@Listener(references = References.Strong)
public class IndexService {
private static final String SCHEDULER_NAME = "distheneIndexCacheExpire";

private static final Logger logger = Logger.getLogger(IndexService.class);

private IndexConfiguration indexConfiguration;
private TransportClient client;
private IndexThread indexThread;
private final IndexConfiguration indexConfiguration;
private final TransportClient client;
private final IndexThread indexThread;

// tenant -> path -> dummy
private ConcurrentMap<String, ConcurrentMap<String, AtomicLong>> cache = new ConcurrentHashMap<>();
private Queue<Metric> metrics = new ConcurrentLinkedQueue<>();

private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory(SCHEDULER_NAME));
private ConcurrentMap<String, ConcurrentMap<String, Long>> cache = new ConcurrentHashMap<>();
private final BlockingQueue<Metric> metrics = new LinkedBlockingQueue<>();

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory(SCHEDULER_NAME));

public IndexService(IndexConfiguration indexConfiguration, MBassador<DistheneEvent> bus) {
this.indexConfiguration = indexConfiguration;
Expand Down Expand Up @@ -76,10 +73,10 @@ public void run() {
}
}

private ConcurrentMap<String, AtomicLong> getTenantPaths(String tenant) {
ConcurrentMap<String, AtomicLong> tenantPaths = cache.get(tenant);
private ConcurrentMap<String, Long> getTenantPaths(String tenant) {
ConcurrentMap<String, Long> tenantPaths = cache.get(tenant);
if (tenantPaths == null) {
ConcurrentMap<String, AtomicLong> newTenantPaths = new ConcurrentHashMap<>();
ConcurrentMap<String, Long> newTenantPaths = new ConcurrentHashMap<>();
tenantPaths = cache.putIfAbsent(tenant, newTenantPaths);
if (tenantPaths == null) {
tenantPaths = newTenantPaths;
Expand All @@ -89,28 +86,24 @@ private ConcurrentMap<String, AtomicLong> getTenantPaths(String tenant) {
return tenantPaths;
}

@Handler(rejectSubtypes = false)
@SuppressWarnings("unused")
@Handler()
public void handle(MetricStoreEvent metricStoreEvent) {
if (indexConfiguration.isCache()) {
handleWithCache(metricStoreEvent.getMetric());
} else {
//noinspection ResultOfMethodCallIgnored
metrics.offer(metricStoreEvent.getMetric());
}
}

private void handleWithCache(Metric metric) {
ConcurrentMap<String, AtomicLong> tenantPaths = getTenantPaths(metric.getTenant());
AtomicLong lastSeen = tenantPaths.get(metric.getPath());
ConcurrentMap<String, Long> tenantPaths = getTenantPaths(metric.getTenant());
Long lastSeen = tenantPaths.put(metric.getPath(), System.currentTimeMillis() / 1000L);

if (lastSeen == null) {
lastSeen = tenantPaths.putIfAbsent(metric.getPath(), new AtomicLong(System.currentTimeMillis() / 1000L));
if (lastSeen == null) {
metrics.offer(metric);
} else {
lastSeen.getAndSet(System.currentTimeMillis() / 1000L);
}
} else {
lastSeen.getAndSet(System.currentTimeMillis() / 1000L);
//noinspection ResultOfMethodCallIgnored
metrics.offer(metric);
}
}

Expand All @@ -121,14 +114,13 @@ private synchronized void expireCache() {
int pathsRemoved = 0;
int pathsTotal = 0;

for(ConcurrentMap<String, AtomicLong> tenantMap : cache.values()) {
for(Iterator<Map.Entry<String, AtomicLong>> iterator = tenantMap.entrySet().iterator(); iterator.hasNext();) {
Map.Entry<String, AtomicLong> entry = iterator.next();
if (entry.getValue().get() < currentTimestamp - indexConfiguration.getExpire()) {
for (ConcurrentMap<String, Long> tenantMap : cache.values()) {
for (Iterator<Map.Entry<String, Long>> iterator = tenantMap.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, Long> entry = iterator.next();
if (entry.getValue() < currentTimestamp - indexConfiguration.getExpire()) {
iterator.remove();
pathsRemoved++;
}

pathsTotal++;
}
}
Expand Down
44 changes: 21 additions & 23 deletions src/main/java/net/iponweb/disthene/service/index/IndexThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;

/**
* @author Andrei Ivanov
Expand All @@ -27,18 +27,18 @@ public class IndexThread extends Thread {

protected volatile boolean shutdown = false;

private TransportClient client;
protected Queue<Metric> metrics;
private String index;
private String type;
private int batchSize;
private int flushInterval;
private final TransportClient client;
protected BlockingQueue<Metric> metrics;
private final String index;
private final String type;
private final int batchSize;
private final int flushInterval;
private long lastFlushTimestamp = System.currentTimeMillis() / 1000L;

private MetricMultiGetRequestBuilder request;
private BulkProcessor bulkProcessor;
private final BulkProcessor bulkProcessor;

public IndexThread(String name, TransportClient client, Queue<Metric> metrics, String index, String type, int batchSize, int flushInterval) {
public IndexThread(String name, TransportClient client, BlockingQueue<Metric> metrics, String index, String type, int batchSize, int flushInterval) {
super(name);
this.client = client;
this.metrics = metrics;
Expand Down Expand Up @@ -76,19 +76,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
public void run() {
while (!shutdown) {
try {
Metric metric = metrics.poll();
if (metric != null) {
addToBatch(metric);
} else {
Thread.sleep(100);
}
Metric metric = metrics.take();
addToBatch(metric);
} catch (Exception e) {
logger.error("Encountered error in busy loop: ", e);
if (!shutdown) logger.error("Encountered error in busy loop: ", e);
}
}

if (request.size() > 0) {
try {
flush();
} catch (Exception e) {
logger.error("Encountered error in busy loop: ", e);
}
}

Expand All @@ -102,6 +100,8 @@ private void addToBatch(Metric metric) {
}

private void flush() {
if (request.size() <= 0) return;

MultiGetResponse multiGetItemResponse = request.execute().actionGet();

for(MultiGetItemResponse response : multiGetItemResponse.getResponses()) {
Expand All @@ -120,7 +120,7 @@ private void flush() {
}
sb.append(parts[i]);
try {
bulkProcessor.add(new IndexRequest(index, type, metric.getTenant() + "_" + sb.toString()).source(
bulkProcessor.add(new IndexRequest(index, type, metric.getTenant() + "_" + sb).source(
XContentFactory.jsonBuilder().startObject()
.field("tenant", metric.getTenant())
.field("path", sb.toString())
Expand All @@ -133,9 +133,7 @@ private void flush() {
}

}

}

}

request = new MetricMultiGetRequestBuilder(client, index, type);
Expand All @@ -145,10 +143,10 @@ public void shutdown() {
shutdown = true;
}

private class MetricMultiGetRequestBuilder extends MultiGetRequestBuilder {
private static class MetricMultiGetRequestBuilder extends MultiGetRequestBuilder {

private String index;
private String type;
private final String index;
private final String type;
Map<String, Metric> metrics = new HashMap<>();


Expand Down

0 comments on commit 12a821a

Please sign in to comment.