Skip to content

Commit

Permalink
FINERACT-2081: External event sending make mark as sent async
Browse files Browse the repository at this point in the history
  • Loading branch information
magyari-adam authored and galovics committed Nov 20, 2024
1 parent bb18aad commit 425085f
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public static class FineractExternalEventsProperties {
private boolean enabled;
private FineractExternalEventsProducerProperties producer;
private int partitionSize;
private int threadPoolCorePoolSize;
private int threadPoolMaxPoolSize;
private int threadPoolQueueCapacity;
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.event.external.config;

import lombok.RequiredArgsConstructor;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@RequiredArgsConstructor
public class EventTaskExecutorConfig {

private final FineractProperties fineractProperties;

@Bean(TaskExecutorConstant.EVENT_MARKS_AS_SENT_EXECUTOR_BEAN_NAME)
public ThreadPoolTaskExecutor sendAsynchronousEventsThreadPool() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(fineractProperties.getEvents().getExternal().getThreadPoolCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(fineractProperties.getEvents().getExternal().getThreadPoolMaxPoolSize());
threadPoolTaskExecutor.setQueueCapacity(fineractProperties.getEvents().getExternal().getThreadPoolQueueCapacity());
threadPoolTaskExecutor.setThreadNamePrefix("external-events-");
threadPoolTaskExecutor.initialize();

return threadPoolTaskExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.event.external.config;

public final class TaskExecutorConstant {

private TaskExecutorConstant() {}

public static final String EVENT_MARKS_AS_SENT_EXECUTOR_BEAN_NAME = "eventMarksAsSentExecutor";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.event.external.config.TaskExecutorConstant;
import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
Expand All @@ -45,9 +50,12 @@
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -60,6 +68,9 @@ public class SendAsynchronousEventsTasklet implements Tasklet {
private final MessageFactory messageFactory;
private final ByteBufferConverter byteBufferConverter;
private final ConfigurationDomainService configurationDomainService;
private final TransactionTemplate transactionTemplate;
@Qualifier(TaskExecutorConstant.EVENT_MARKS_AS_SENT_EXECUTOR_BEAN_NAME)
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
Expand Down Expand Up @@ -104,14 +115,31 @@ private void markEventsAsSent(List<Long> eventIds) {
// Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
final int partitionSize = fineractProperties.getEvents().getExternal().getPartitionSize();
List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
List<Future<?>> tasks = new ArrayList<>();
final FineractContext context = ThreadLocalContextUtil.getContext();
partitions //
.forEach(partitionedEventIds -> {
measure(() -> {
repository.markEventsSent(partitionedEventIds, sentAt);
}, timeTaken -> {
log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
});
tasks.add(threadPoolTaskExecutor.submit(() -> {
ThreadLocalContextUtil.init(context);
transactionTemplate.execute((status) -> {
measure(() -> {
repository.markEventsSent(partitionedEventIds, sentAt);
}, timeTaken -> {
log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
});
return null;
});
}));
});
for (Future<?> task : tasks) {
try {
task.get();
} catch (InterruptedException e) {
log.error("Interrupted while marking events as sent", e);
} catch (ExecutionException e) {
log.error("Exception while marking events as sent", e);
}
}
}

private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
Expand Down
3 changes: 3 additions & 0 deletions fineract-provider/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ fineract.remote-job-message-handler.kafka.admin.extra-properties=${FINERACT_REMO

fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
fineract.events.external.partition-size=${FINERACT_EXTERNAL_EVENTS_PARTITION_SIZE:5000}
fineract.events.external.thread-pool-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:2}
fineract.events.external.thread-pool-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:25}
fineract.events.external.thread-pool-queue-capacity=${FINERACT_EVENT_TASK_EXECUTOR_QUEUE_CAPACITY:500}
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.async-send-enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
Expand All @@ -47,11 +48,13 @@
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -61,6 +64,10 @@
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand All @@ -82,6 +89,10 @@ class SendAsynchronousEventsTaskletTest {
private ByteBufferConverter byteBufferConverter;
@Mock
private ConfigurationDomainService configurationDomainService;
@Mock
private TransactionTemplate transactionTemplate;
@Mock
private TransactionStatus transactionStatus;
private SendAsynchronousEventsTasklet underTest;
private RepeatStatus resultStatus;

Expand All @@ -94,8 +105,13 @@ public void setUp() {
ThreadLocalContextUtil
.setBusinessDates(new HashMap<>(Map.of(BusinessDateType.BUSINESS_DATE, LocalDate.now(ZoneId.systemDefault()))));
configureExternalEventsProducerReadBatchSizeProperty();
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(2);
taskExecutor.initialize();
when(transactionTemplate.execute(ArgumentMatchers.any()))
.thenAnswer(invocation -> invocation.<TransactionCallback<Boolean>>getArgument(0).doInTransaction(transactionStatus));
underTest = new SendAsynchronousEventsTasklet(fineractProperties, repository, eventProducer, messageFactory, byteBufferConverter,
configurationDomainService);
configurationDomainService, transactionTemplate, taskExecutor);
}

@AfterEach
Expand All @@ -113,6 +129,9 @@ private void configureExternalEventsProducerReadBatchSizeProperty() {
externalProperties.setPartitionSize(5000);
externalEventsProducerProperties.setJms(externalEventsProducerJMSProperties);
externalProperties.setProducer(externalEventsProducerProperties);
externalProperties.setThreadPoolCorePoolSize(1);
externalProperties.setThreadPoolMaxPoolSize(1);
externalProperties.setThreadPoolQueueCapacity(10);
eventsProperties.setExternal(externalProperties);
when(fineractProperties.getEvents()).thenReturn(eventsProperties);
when(configurationDomainService.retrieveExternalEventBatchSize()).thenReturn(10L);
Expand All @@ -135,7 +154,9 @@ public void givenBatchSize2WhenTaskExecutionThenSend2Events() throws Exception {
resultStatus = underTest.execute(stepContribution, chunkContext);
// then
verify(eventProducer).sendEvents(Mockito.any());
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
});
assertEquals(RepeatStatus.FINISHED, resultStatus);
}

Expand Down Expand Up @@ -174,7 +195,9 @@ public void givenOneEventWhenEventSentThenEventStatusUpdates() throws Exception
// then
verify(messageFactory).createMessage(Mockito.any());
verify(eventProducer).sendEvents(Mockito.any());
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
});
assertEquals(RepeatStatus.FINISHED, resultStatus);
}

Expand All @@ -194,7 +217,9 @@ public void testExecuteShouldHandleNullAggregateId() throws Exception {
// then
verify(messageFactory).createMessage(Mockito.any());
verify(eventProducer).sendEvents(Map.of(-1L, List.of(byteMsg)));
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
verify(repository).markEventsSent(Mockito.eq(events.stream().map(ExternalEventView::getId).toList()), Mockito.any());
});
assertEquals(RepeatStatus.FINISHED, resultStatus);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ fineract.remote-job-message-handler.jms.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HA
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_QUEUE_NAME:JMS-request-queue}
fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
fineract.events.external.partition-size=${FINERACT_EXTERNAL_EVENTS_PARTITION_SIZE:5000}
fineract.events.external.thread-pool-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:2}
fineract.events.external.thread-pool-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:25}
fineract.events.external.thread-pool-queue-capacity=${FINERACT_EVENT_TASK_EXECUTOR_QUEUE_CAPACITY:500}
fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
Expand Down

0 comments on commit 425085f

Please sign in to comment.