Skip to content

Commit

Permalink
Add custom note locator (#234)
Browse files Browse the repository at this point in the history
* Add customizable node locator for ketama.

Add parameter to builder class with default for existing NodeLocator (Continuum).
Add ability to change vnode_ratio in Continuum.

* Add test for custom prefix node locator

* Format files

---------

Co-authored-by: thiagocesarj <76481925+thiagocesarj@users.noreply.github.com>
  • Loading branch information
Kamyki and thiagocesarj authored Feb 28, 2024
1 parent 74630c8 commit 08d7d95
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 16 deletions.
26 changes: 24 additions & 2 deletions folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import com.spotify.folsom.client.tls.SSLEngineFactory;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.folsom.ketama.Continuum;
import com.spotify.folsom.ketama.KetamaMemcacheClient;
import com.spotify.folsom.ketama.NodeLocator;
import com.spotify.folsom.ketama.ResolvingKetamaClient;
import com.spotify.folsom.reconnect.CatchingReconnectionListener;
import com.spotify.folsom.reconnect.ReconnectingClient;
Expand All @@ -56,12 +58,14 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -126,6 +130,7 @@ public class MemcacheClientBuilder<V> {
private final List<UsernamePasswordPair> passwords = new ArrayList<>();
private boolean skipAuth = false;

private Function<Collection<AddressAndClient>, NodeLocator> nodeLocator = Continuum::new;
private SSLEngineFactory sslEngineFactory = null;

/**
Expand Down Expand Up @@ -571,6 +576,22 @@ public MemcacheClientBuilder<V> withUsernamePassword(
return this;
}

/**
* When using ketama client use provided NodeLocator to find client for given key. NodeLocator
* will be recreated when clients appear and disappear when using dynamic resolver.
*
* <p>This option can be used to change default hashing algorithm or vnode_ratio in consistent
* hashing algorithm.
*
* @param nodeLocator mapper from client collection to NodeLocator
* @return itself
*/
public MemcacheClientBuilder<V> withNodeLocator(
Function<Collection<AddressAndClient>, NodeLocator> nodeLocator) {
this.nodeLocator = nodeLocator;
return this;
}

/**
* Disable authentication validation - only useful for tests against jmemcached which does not
* support binary NOOP
Expand Down Expand Up @@ -673,7 +694,7 @@ protected RawMemcacheClient connectRaw(boolean binary, Authenticator authenticat
aac.add(new AddressAndClient(address, clients.get(i)));
}

client = new KetamaMemcacheClient(aac);
client = new KetamaMemcacheClient(aac, nodeLocator.apply(aac));
} else {
client = clients.get(0);
}
Expand Down Expand Up @@ -705,7 +726,8 @@ private RawMemcacheClient createResolvingClient(
TimeUnit.MILLISECONDS,
input -> createClient(input, binary, authenticator),
shutdownDelay,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS,
nodeLocator);

client.start();
return client;
Expand Down
13 changes: 9 additions & 4 deletions folsom/src/main/java/com/spotify/folsom/ketama/Continuum.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,29 @@
import java.util.Map.Entry;
import java.util.TreeMap;

public class Continuum {
public class Continuum implements NodeLocator {

private static final int VNODE_RATIO = 100;

private final TreeMap<Integer, RawMemcacheClient> ringOfFire;

public Continuum(final Collection<AddressAndClient> clients) {
this.ringOfFire = buildRing(clients);
this(clients, VNODE_RATIO);
}

public Continuum(final Collection<AddressAndClient> clients, int vnodeRatio) {
this.ringOfFire = buildRing(clients, vnodeRatio);
}

private TreeMap<Integer, RawMemcacheClient> buildRing(
final Collection<AddressAndClient> clients) {
final Collection<AddressAndClient> clients, int vnodeRatio) {

final TreeMap<Integer, RawMemcacheClient> r = new TreeMap<>();
for (final AddressAndClient client : clients) {
final String address = client.getAddress().toString();

byte[] hash = addressToBytes(address);
for (int i = 0; i < VNODE_RATIO; i++) {
for (int i = 0; i < vnodeRatio; i++) {
final HashCode hashCode = Hasher.hash(hash);
hash = hashCode.asBytes();
r.put(hashCode.asInt(), client.getClient());
Expand All @@ -51,6 +55,7 @@ private TreeMap<Integer, RawMemcacheClient> buildRing(
return r;
}

@Override
public RawMemcacheClient findClient(final byte[] key) {
final int keyHash = Hasher.hash(key).asInt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ private static Collection<RawMemcacheClient> clientsOnly(
return clients;
}

private final Continuum continuum;
private final NodeLocator nodeLocator;

public KetamaMemcacheClient(final Collection<AddressAndClient> clients) {
public KetamaMemcacheClient(final Collection<AddressAndClient> clients, NodeLocator nodeLocator) {
super(clientsOnly(clients));
if (clients.isEmpty()) {
throw new IllegalArgumentException("Can not create ketama client from empty list");
}

this.continuum = new Continuum(clients);
this.nodeLocator = nodeLocator;
}

private RawMemcacheClient getClient(final byte[] key) {
return continuum.findClient(key);
return nodeLocator.findClient(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.spotify.folsom.ketama;

import com.spotify.folsom.RawMemcacheClient;

public interface NodeLocator {

RawMemcacheClient findClient(final byte[] key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class ResolvingKetamaClient extends AbstractRawMemcacheClient {
private volatile RawMemcacheClient currentClient;
private volatile RawMemcacheClient pendingClient = null;
private boolean shutdown = false;
private final Function<Collection<AddressAndClient>, NodeLocator> nodeLocator;

public ResolvingKetamaClient(
Resolver resolver,
Expand All @@ -72,14 +74,16 @@ public ResolvingKetamaClient(
TimeUnit periodUnit,
final Connector connector,
long shutdownDelay,
TimeUnit shutdownUnit) {
TimeUnit shutdownUnit,
Function<Collection<AddressAndClient>, NodeLocator> nodeLocator) {
this.resolver = resolver;
this.connector = connector;
this.shutdownDelay = shutdownDelay;
this.shutdownUnit = shutdownUnit;
this.executor = executor;
this.currentClient = NotConnectedClient.INSTANCE;
this.ttl = TimeUnit.SECONDS.convert(period, periodUnit);
this.nodeLocator = nodeLocator;
}

public void start() {
Expand Down Expand Up @@ -216,7 +220,8 @@ private void setPendingClient(final ImmutableList.Builder<RawMemcacheClient> rem

// This may invalidate an existing pendingClient but should be fine since it doesn't have any
// important state of its own.
final KetamaMemcacheClient newClient = new KetamaMemcacheClient(addressAndClients);
final KetamaMemcacheClient newClient =
new KetamaMemcacheClient(addressAndClients, nodeLocator.apply(addressAndClients));
this.pendingClient = newClient;

newClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.spotify.dns.LookupResult;
import com.spotify.folsom.client.test.FakeRawMemcacheClient;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.Continuum;
import com.spotify.folsom.ketama.ResolvingKetamaClient;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -65,7 +66,8 @@ public void testSimple() throws Exception {
TimeUnit.MILLISECONDS,
connector,
1000,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS,
Continuum::new);
executor.tick(1000, TimeUnit.SECONDS);

assertFalse(ketamaClient.isConnected());
Expand Down Expand Up @@ -141,7 +143,8 @@ public void testReusingConnections() {
TimeUnit.MILLISECONDS,
connector,
1000,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS,
Continuum::new);
executor.tick(1000, TimeUnit.SECONDS);

assertFalse(ketamaClient.isConnected());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void test(
}
}

final KetamaMemcacheClient ketamaMemcacheClient = new KetamaMemcacheClient(clients);
final KetamaMemcacheClient ketamaMemcacheClient =
new KetamaMemcacheClient(clients, new Continuum(clients));
final MemcacheClient<String> memcacheClient = buildClient(ketamaMemcacheClient, binary);

final List<String> requestedKeys = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;

public class ContinuumTest {
@RunWith(MockitoJUnitRunner.class)
public class NodeLocatorTest {

private static final HostAndPort ADDRESS1 = HostAndPort.fromParts("127.0.0.1", 11211);
private static final HostAndPort ADDRESS2 = HostAndPort.fromParts("127.0.0.1", 11212);
Expand Down Expand Up @@ -183,6 +186,39 @@ public void testWrapDisconnected() {
assertSame(CLIENT2, c.findClient(bytes("key1561")));
}

@Test
public void testPrefixNodeLocator() {
final List<AddressAndClient> clients = ImmutableList.of(AAC1, AAC2, AAC3);
final Continuum c = new Continuum(clients);
final NodeLocator nodeLocator =
key -> {
String[] keyParts = new String(key, StandardCharsets.US_ASCII).split("-");
if (keyParts.length > 1) {
return clients.get(Integer.parseInt(keyParts[0]) - 1).getClient();
} else {
return c.findClient(key);
}
};

List<RawMemcacheClient> actual =
Arrays.asList(
nodeLocator.findClient(bytes("1-key1")),
nodeLocator.findClient(bytes("1-key2")),
nodeLocator.findClient(bytes("1-key3")),
nodeLocator.findClient(bytes("2-key1")),
nodeLocator.findClient(bytes("3-key1")),
nodeLocator.findClient(bytes(KEY1)),
nodeLocator.findClient(bytes(KEY3)));

List<RawMemcacheClient> expected =
Arrays.asList(
CLIENT1, CLIENT1, CLIENT1, // keys prefixed with 1
CLIENT2, // keys prefixed with 2
CLIENT3, // keys prefixed with 3
CLIENT1, CLIENT2); // fallback to default NodeLocator
assertEquals(expected, actual);
}

private static byte[] bytes(String key) {
return key.getBytes(StandardCharsets.US_ASCII);
}
Expand Down

0 comments on commit 08d7d95

Please sign in to comment.