Skip to content

Commit

Permalink
reverts ingest library changes
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebanasios committed Dec 5, 2024
1 parent 46a3e92 commit be74852
Show file tree
Hide file tree
Showing 14 changed files with 786 additions and 947 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.microsoft.azure.kusto.data;

import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.lang.invoke.MethodHandles;
import java.time.Duration;

public class ExponentialRetry {
public class ExponentialRetry<E1 extends Throwable, E2 extends Throwable> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final int maxAttempts;
Expand All @@ -34,33 +30,36 @@ public ExponentialRetry(ExponentialRetry other) {
this.maxJitterSecs = other.maxJitterSecs;
}

public Retry retry() {
return Retry.from(retrySignals -> retrySignals.flatMap(retrySignal -> {
// Caller should throw only permanent errors, returning null if a retry is needed
public <T> T execute(KustoCheckedFunction<Integer, T, E1, E2> function) throws E1, E2 {
for (int currentAttempt = 0; currentAttempt < maxAttempts; currentAttempt++) {
log.info("execute: Attempt {}", currentAttempt);

Retry.RetrySignal signalCopy = retrySignal.copy();
long currentAttempt = signalCopy.totalRetries();
log.info("Retry attempt {}.", currentAttempt);

Throwable failure = signalCopy.failure();
if (failure instanceof DataServiceException && ((DataServiceException) failure).isPermanent()) {
log.error("Error is permanent, stopping.", failure);
return Mono.error(failure);
}

if (currentAttempt >= maxAttempts) {
log.info("Max retry attempts reached: {}.", currentAttempt);
return Mono.error(failure);
try {
T result = function.apply(currentAttempt);
if (result != null) {
return result;
}
} catch (Exception e) {
log.error("execute: Error is permanent, stopping", e);
throw e;
}

double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt);
double jitterSecs = (float) Math.random() * maxJitterSecs;
double sleepMs = (currentSleepSecs + jitterSecs) * 1000;

log.info("Attempt {} failed, trying again after sleep of {} seconds.", currentAttempt, sleepMs / 1000);
log.info("execute: Attempt {} failed, trying again after sleep of {} seconds", currentAttempt, sleepMs / 1000);

try {
Thread.sleep((long) sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("execute: Interrupted while sleeping", e);
}
}

// Each retry can occur on a different thread
return Mono.delay(Duration.ofMillis((long) sleepMs));
}));
return null;
}

}
12 changes: 0 additions & 12 deletions ingest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,5 @@
<version>6.0.0</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-test -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor-test.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import reactor.core.publisher.Mono;

import java.io.Closeable;

Expand All @@ -22,12 +23,13 @@ public interface IngestClient extends Closeable {
* @param fileSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see FileSourceInfo
* @see IngestionProperties
*/
IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties);

Mono<IngestionResult> ingestFromFileAsync(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties);
IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;

/**
* <p>Ingest data from a blob storage into Kusto database.</p>
Expand All @@ -37,12 +39,13 @@ public interface IngestClient extends Closeable {
* @param blobSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see BlobSourceInfo
* @see IngestionProperties
*/
IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties);

Mono<IngestionResult> ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties);
IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;

/**
* <p>Ingest data from a Result Set into Kusto database.</p>
Expand All @@ -55,12 +58,13 @@ public interface IngestClient extends Closeable {
* @param resultSetSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see ResultSetSourceInfo
* @see IngestionProperties
*/
IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties);

Mono<IngestionResult> ingestFromResultSetAsync(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties);
IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;

/**
* <p>Ingest data from an input stream, into Kusto database.</p>
Expand All @@ -70,10 +74,11 @@ public interface IngestClient extends Closeable {
* @param streamSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see StreamSourceInfo
* @see IngestionProperties
*/
IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties);

Mono<IngestionResult> ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties);
IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import org.apache.http.conn.util.InetAddressUtils;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.net.URI;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.*;

import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -57,18 +59,16 @@ static boolean isReservedHostname(String rawUri) {
return isLocalFlag || isIPFlag || authority.equalsIgnoreCase("onebox.dev.kusto.windows.net");
}

public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) {
return ingestFromFileAsync(fileSourceInfo, ingestionProperties).block();
}

protected abstract Mono<IngestionResult> ingestFromFileAsyncImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties);
protected abstract IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;

public Mono<IngestionResult> ingestFromFileAsync(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) {
// trace ingestFromFileAsync
return Mono.defer(() -> MonitoredActivity.wrap(
ingestFromFileAsyncImpl(fileSourceInfo,
public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromFile
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> ingestFromFileImpl(fileSourceInfo,
ingestionProperties),
getClientType().concat(".ingestFromFile")));
getClientType().concat(".ingestFromFile"));
}

/**
Expand All @@ -79,21 +79,21 @@ public Mono<IngestionResult> ingestFromFileAsync(FileSourceInfo fileSourceInfo,
* @param blobSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see BlobSourceInfo
* @see IngestionProperties
*/
public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) {
return ingestFromBlobAsync(blobSourceInfo, ingestionProperties).block();
}
protected abstract IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;

protected abstract Mono<IngestionResult> ingestFromBlobAsyncImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties);

public Mono<IngestionResult> ingestFromBlobAsync(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) {
public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromBlob
return Mono.defer(() -> MonitoredActivity.wrap(
ingestFromBlobAsyncImpl(blobSourceInfo,
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> ingestFromBlobImpl(blobSourceInfo,
ingestionProperties),
getClientType().concat(".ingestFromBlob")));
getClientType().concat(".ingestFromBlob"));
}

/**
Expand All @@ -107,21 +107,21 @@ public Mono<IngestionResult> ingestFromBlobAsync(BlobSourceInfo blobSourceInfo,
* @param resultSetSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see ResultSetSourceInfo
* @see IngestionProperties
*/
public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) {
return ingestFromResultSetAsync(resultSetSourceInfo, ingestionProperties).block();
}
protected abstract IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;

protected abstract Mono<IngestionResult> ingestFromResultSetAsyncImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties);

public Mono<IngestionResult> ingestFromResultSetAsync(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) {
public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromResultSet
return Mono.defer(() -> MonitoredActivity.wrap(
ingestFromResultSetAsyncImpl(resultSetSourceInfo,
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> ingestFromResultSetImpl(resultSetSourceInfo,
ingestionProperties),
getClientType().concat(".ingestFromResultSet")));
getClientType().concat(".ingestFromResultSet"));
}

/**
Expand All @@ -132,21 +132,27 @@ public Mono<IngestionResult> ingestFromResultSetAsync(ResultSetSourceInfo result
* @param streamSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation
* @return {@link IngestionResult} object including the ingestion result
* @throws IngestionClientException An exception originating from a client activity
* @throws IngestionServiceException An exception returned from the service
* @see StreamSourceInfo
* @see IngestionProperties
*/
public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) {
return ingestFromStreamAsync(streamSourceInfo, ingestionProperties).block();
}
protected abstract IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException, IOException;

protected abstract Mono<IngestionResult> ingestFromStreamAsyncImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties);

public Mono<IngestionResult> ingestFromStreamAsync(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) {
public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromStream
return Mono.defer(() -> MonitoredActivity.wrap(
ingestFromStreamAsyncImpl(streamSourceInfo,
ingestionProperties),
getClientType().concat(".ingestFromStream")));
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> {
try {
return ingestFromStreamImpl(streamSourceInfo,
ingestionProperties);
} catch (IOException e) {
throw new IngestionServiceException(ExceptionsUtils.getMessageEx(e), e);
}
},
getClientType().concat(".ingestFromStream"));
}

protected Map<String, String> getIngestionTraceAttributes(TraceableAttributes sourceInfo, TraceableAttributes ingestionProperties) {
Expand Down
Loading

0 comments on commit be74852

Please sign in to comment.