Skip to content

Commit

Permalink
W-17312191: Use newInitialPositionAtTimestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
peterzxu-crm committed Dec 16, 2024
1 parent 57217f5 commit 32db994
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
Expand Down Expand Up @@ -45,16 +44,11 @@ public class Consumers {
private final PointProcessor pointProcessor;

private final KinesisConfig kinesisConfig;
private final CheckPointMgr<Date> checkPointMgr;

private final ConsumerRules consumerRules;

private final Map<String, KinesisConsumer> consumers;

private final String kinesisConsumerRegion;

private final PointProcessor recoveryPointProcessor;

private final NamespaceCounter namespaceCounter;

private final File indexNameSyncDir;
Expand All @@ -67,23 +61,23 @@ public class Consumers {

private final CloudWatchAsyncClient cloudWatchAsyncClient;

Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor, File rulesFile,
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr, String kinesisConsumerRegion,
private final int kinesisConsumerRetroSeconds;

Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, File rulesFile, KinesisConfig kinesisConfig,
NamespaceCounter namespaceCounter, File indexNameSyncDir, String activeProfile,
KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient) {
KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient,
CloudWatchAsyncClient cloudWatchAsyncClient, int kinesisConsumerRetroSeconds) {

this.metricRegistry = metricRegistry;
this.pointProcessor = pointProcessor;
this.recoveryPointProcessor = recoveryPointProcessor;
this.kinesisConfig = kinesisConfig;
this.checkPointMgr = checkPointMgr;
this.kinesisConsumerRegion = kinesisConsumerRegion;
this.namespaceCounter = namespaceCounter;
this.indexNameSyncDir = indexNameSyncDir;
this.activeProfile = activeProfile;
this.kinesisAsyncClient = kinesisAsyncClient;
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
this.cloudWatchAsyncClient = cloudWatchAsyncClient;
this.kinesisConsumerRetroSeconds = kinesisConsumerRetroSeconds;
consumers = new ConcurrentHashMap<>();
consumerRules = new ConsumerRules(rulesFile);
reload();
Expand Down Expand Up @@ -135,6 +129,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
/* create new consumers */
// we use the host name to generate the kinesis application name as they are stable for stable set pods.
String hostName = getHostName();
Date kinesisConsumerRetroDate = new Date(System.currentTimeMillis() - kinesisConsumerRetroSeconds * 1000L);
for (String consumerName : newRules) {
log.info(String.format("Creating new consumer with kinesis stream name: %s", consumerName));

Expand Down Expand Up @@ -162,7 +157,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules

Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter"));
StreamTracker streamTracker = new SingleStreamTracker(kinesisStreamName,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(kinesisConsumerRetroDate));
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamTracker, kinesisApplicationName, kinesisAsyncClient,
dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(),
new KinesisRecordProcessorFactory(metricRegistry, pointProcessor, kinesisConfig, kinesisStreamName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ public class cfgCarbonJ
@Value( "${metrics.store.dataDir:data}" )
private String dataDir = null;

@Value( "${kinesis.consumer.region:us-east-1}" )
private String kinesisConsumerRegion = "us-east-1";

@Value( "${kinesis.relay.region:us-east-1}" )
private String kinesisRelayRegion = "us-east-1";

Expand All @@ -191,11 +188,6 @@ public class cfgCarbonJ

@Value( "${kinesis.relay.role:}" ) private String kinesisRelayRole;

/**
* Config server properties
*/
@Value( "${configServer.enabled:false}" ) private boolean configServerEnabled;

@Value( "${configServer.registrationSeconds:30}" ) private int configServerRegistrationSeconds;

@Value( "${configServer.baseUrl:http://localhost:8081}" ) private String configServerBaseUrl;
Expand All @@ -214,6 +206,9 @@ public class cfgCarbonJ
@Value("${spring.profiles.active:prd}")
private String activeProfile;

@Value("${kinesis.consumer.retroSeconds:600}")
private int kinesisConsumerRetroSeconds = 600;

@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
Expand Down Expand Up @@ -431,17 +426,13 @@ Consumer<DataPoints> dataPointSink( @Qualifier( "dataPointSinkRelay" ) Relay r )

@Bean
@ConditionalOnProperty(name = "rocksdb.readonly", havingValue = "false", matchIfMissing = true)
Consumers consumer( PointProcessor pointProcessor,
@Qualifier( "recoveryPointProcessor" ) PointProcessor recoveryPointProcessor,
ScheduledExecutorService s, KinesisConfig kinesisConfig, NamespaceCounter nsCounter )
{
Consumers consumer( PointProcessor pointProcessor, ScheduledExecutorService s, KinesisConfig kinesisConfig, NamespaceCounter nsCounter) {
if ( kinesisConfig.isKinesisConsumerEnabled() )
{
File rulesFile = locateConfigFile( serviceDir, consumerRulesFile );
Consumers consumer = new Consumers( metricRegistry, pointProcessor, recoveryPointProcessor, rulesFile,
kinesisConfig, checkPointMgr, kinesisConsumerRegion,
Consumers consumer = new Consumers( metricRegistry, pointProcessor, rulesFile, kinesisConfig,
nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync"), activeProfile,
kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient);
kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, kinesisConsumerRetroSeconds);
s.scheduleWithFixedDelay( consumer::reload, 15, 30, TimeUnit.SECONDS );
if (syncSecondaryDb) {
s.scheduleWithFixedDelay( consumer::syncNamespaces, 60, 60, TimeUnit.SECONDS );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void run() {
// log.info("Message sent.ShardId is " + putRecordResult.getShardId());
} else {
metricsDropped.mark();
log.error("Couldn't process record " + putRecordRequest + ". Skipping the record.");
log.error("Couldn't process record {}. Skipping the record.", putRecordRequest);
}
} catch(Throwable e) {
log.error(e.getMessage(),e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private DataPointsInfo getNextRecord(Map<ShardInfo, RecordTrackingInfo> shardToT

DataPoints dataPoints = codec.decode(record.data().asByteArray());

DataPointsInfo dataPointsInfo = new DataPointsInfo(dataPoints, shardInfo, record.approximateArrivalTimestamp().getEpochSecond());
DataPointsInfo dataPointsInfo = new DataPointsInfo(dataPoints, shardInfo, record.approximateArrivalTimestamp().toEpochMilli());

if (log.isDebugEnabled())
{
Expand Down

0 comments on commit 32db994

Please sign in to comment.