Skip to content

Commit

Permalink
Merge pull request #10 from pushkarmoi/pugupta/retries
Browse files Browse the repository at this point in the history
retry endpoint and use subdomain
  • Loading branch information
pushkarmoi authored Feb 3, 2024
2 parents 2ce2005 + 8a4d1fb commit f353e2f
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static void main(String[] args) {
private final boolean _initWrites;

private final int _maxRuns;
private final int _numWarmups;
private final int _threads;
private final double _maxRate;
private final Duration _benchmarkDuration;
Expand All @@ -111,6 +112,7 @@ public MultiThreadedRunner(DBClient dbClient, BenchmarkParams params) {
_kvGenerator = new KVGeneratorV2(_numSamples);

_maxRuns = params.getIntegerParameter("max_runs").orElse(Integer.MAX_VALUE);
_numWarmups = params.getIntegerParameter("warmups").orElse(1);
_threads = params.getIntegerParameter("threads").orElse(1);
_maxRate = params.getIntegerParameter("max_qps").orElse(Integer.MAX_VALUE);
_benchmarkDuration =
Expand All @@ -132,10 +134,12 @@ public void run() {
public void execGetBenchmark() {
// single warmup iteration
Instant start = Instant.now();
singleThreadedGetBenchmark(
new Histogram("warmup"), RateLimiter.create(Double.MAX_VALUE), Duration.ofDays(365), 1);
for (int i = 0; i < _numWarmups; i++) {
singleThreadedGetBenchmark(
new Histogram("warmup"), RateLimiter.create(Double.MAX_VALUE), Duration.ofDays(365), 1);
}
LOGGER.info(
"execGetBenchmark warmup done in: {} sec",
"execGetBenchmark warmup(s) done in: {} sec",
Duration.between(start, Instant.now()).toSeconds());

// instrumented run
Expand Down
84 changes: 52 additions & 32 deletions src/main/java/io/inlined/clients/DefaultInlineKVWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.inlineio.schemas.Services.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.protobuf.StatusProto;
import io.grpc.StatusRuntimeException;
import java.time.Instant;
import java.util.Collection;
import java.util.Objects;
Expand All @@ -25,7 +25,6 @@ public DefaultInlineKVWriter(ClientOptions clientOptions) {

@Override
public void startupWriter() {
// TODO: stub creation- use dns
ManagedChannelBuilder<?> channelBuilder =
ManagedChannelBuilder.forAddress(
IKVConstants.IKV_GATEWAY_GRPC_URL, IKVConstants.IKV_GATEWAY_GRPC_PORT)
Expand Down Expand Up @@ -57,18 +56,25 @@ public void upsertFieldValues(IKVDocument document) {
.setUserStoreContextInitializer(_userStoreCtxInitializer)
.build();

try {
// make grpc call
Status ignored = _stub.upsertFieldValues(request);
} catch (Throwable thrown) {
// propagate errors
com.google.rpc.Status errorStatus = StatusProto.fromThrowable(thrown);
if (errorStatus != null) {
throw new RuntimeException(
"upsertFieldValues failed with error: "
+ MoreObjects.firstNonNull(errorStatus.getMessage(), "unknown"));
StatusRuntimeException maybeException = null;
for (int retry = 0; retry < 3; retry++) {
try {
// make grpc call
Status ignored = _stub.upsertFieldValues(request);
return;
} catch (StatusRuntimeException e) {
maybeException = e;

// retry only when servers are unavailable
if (e.getStatus().getCode() != io.grpc.Status.Code.UNAVAILABLE) {
break;
}
}
}

throw new RuntimeException(
"upsertFieldValues failed with error: "
+ MoreObjects.firstNonNull(maybeException.getMessage(), "unknown"));
}

@Override
Expand All @@ -93,18 +99,25 @@ public void deleteFieldValues(IKVDocument documentId, Collection<String> fieldsT
.setUserStoreContextInitializer(_userStoreCtxInitializer)
.build();

try {
// make grpc call
Status _ignored = _stub.deleteFieldValues(request);
} catch (Throwable thrown) {
// propagate errors
com.google.rpc.Status errorStatus = StatusProto.fromThrowable(thrown);
if (errorStatus != null) {
throw new RuntimeException(
"deleteFieldValues failed with error: "
+ MoreObjects.firstNonNull(errorStatus.getMessage(), "unknown"));
StatusRuntimeException maybeException = null;
for (int retry = 0; retry < 3; retry++) {
try {
// make grpc call
Status ignored = _stub.deleteFieldValues(request);
return;
} catch (StatusRuntimeException e) {
maybeException = e;

// retry only when servers are unavailable
if (e.getStatus().getCode() != io.grpc.Status.Code.UNAVAILABLE) {
break;
}
}
}

throw new RuntimeException(
"deleteFieldValues failed with error: "
+ MoreObjects.firstNonNull(maybeException.getMessage(), "unknown"));
}

@Override
Expand All @@ -125,17 +138,24 @@ public void deleteDocument(IKVDocument documentId) {
.setUserStoreContextInitializer(_userStoreCtxInitializer)
.build();

try {
// make grpc call
Status _ignored = _stub.deleteDocument(request);
} catch (Throwable thrown) {
// propagate errors
com.google.rpc.Status errorStatus = StatusProto.fromThrowable(thrown);
if (errorStatus != null) {
throw new RuntimeException(
"deleteDocument failed with error: "
+ MoreObjects.firstNonNull(errorStatus.getMessage(), "unknown"));
StatusRuntimeException maybeException = null;
for (int retry = 0; retry < 3; retry++) {
try {
// make grpc call
Status ignored = _stub.deleteDocument(request);
return;
} catch (StatusRuntimeException e) {
maybeException = e;

// retry only when servers are unavailable
if (e.getStatus().getCode() != io.grpc.Status.Code.UNAVAILABLE) {
break;
}
}
}

throw new RuntimeException(
"deleteDocument failed with error: "
+ MoreObjects.firstNonNull(maybeException.getMessage(), "unknown"));
}
}
3 changes: 1 addition & 2 deletions src/main/java/io/inlined/clients/IKVConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

/** Config key strings. */
public class IKVConstants {
public static final String IKV_GATEWAY_GRPC_URL =
"gateway-writer-alb-1-1639339774.us-west-2.elb.amazonaws.com";
public static final String IKV_GATEWAY_GRPC_URL = "gateway.inlined.io";
public static final int IKV_GATEWAY_GRPC_PORT = 443;

public static final String S3_BASE_INDEX_BUCKET = "ikv-base-index-v1";
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Configuration>
<Appenders>
<Console name="Console">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%p][%c] %m%n"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%p][%C{1}] %m%n"/>
</Console>
</Appenders>
<Loggers>
Expand Down
18 changes: 9 additions & 9 deletions src/test/java/io/inlined/NearlineIntegrationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,29 @@ public void upsertAndRead() throws InterruptedException {
new ClientOptions.Builder()
.withMountDirectory("/tmp/NearlineIntegrationTests")
.withStoreName("testing-store")
.withAccountId(System.getenv("IKV_ACCOUNT_ID"))
.withAccountPassKey(System.getenv("IKV_ACCOUNT_PASSKEY"))
.withAccountId("foo")
.withAccountPassKey("bar")
.useStringPrimaryKey()
.build();

IKVClientFactory factory = new IKVClientFactory();

/*InlineKVWriter writer = factory.createNewWriterInstance(clientOptions);
InlineKVWriter writer = factory.createNewWriterInstance(clientOptions);

writer.startupWriter();

IKVDocument document =
new IKVDocument.Builder()
.putStringField("userid", "id_2") // primary key
.putIntField("age", 25)
.putLongField("ageAsLong", 25)
.putFloatField("ageAsFloat", 25.2f)
.putDoubleField("ageAsDouble", 25.2)
.putStringField("firstname", "Alice")
.putIntField("age", 25)
.putLongField("ageAsLong", 25)
.putFloatField("ageAsFloat", 25.2f)
.putDoubleField("ageAsDouble", 25.2)
.putStringField("firstname", "Alice")
.build();
writer.upsertFieldValues(document);

Thread.sleep(1000);*/
Thread.sleep(1000);

InlineKVReader reader = factory.createNewReaderInstance(clientOptions);
reader.startupReader();
Expand Down

0 comments on commit f353e2f

Please sign in to comment.