From 92ce39cd9ea66143a07073c76b8d5502f077ed26 Mon Sep 17 00:00:00 2001 From: p-eye Date: Tue, 17 Sep 2024 16:44:58 +0900 Subject: [PATCH 1/8] feat: add a dropdown button to the topic table on the Consumer Group page for deleting offsets. --- .../main/resources/swagger/kafbat-ui-api.yaml | 26 ++++++++++++++++ .../ConsumerGroups/Details/ListItem.tsx | 31 +++++++++++++++++-- frontend/src/lib/hooks/api/consumers.ts | 27 ++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 7ca62831f..5eede6cef 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1048,6 +1048,32 @@ paths: 200: description: OK + /api/clusters/{clusterName}/consumer-groups/{id}/topics/{topicName}: + delete: + tags: + - Consumer Groups + summary: delete consumer group offsets + operationId: deleteConsumerGroupOffsets + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + /api/clusters/{clusterName}/schemas: post: tags: diff --git a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx index 57b7cb133..953cc28cf 100644 --- a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx +++ b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx @@ -1,8 +1,12 @@ import React from 'react'; -import { ConsumerGroupTopicPartition } from 'generated-sources'; +import { Action, ConsumerGroupTopicPartition, ResourceType } from 'generated-sources'; import { Link } from 'react-router-dom'; import { ClusterName } from 'lib/interfaces/cluster'; -import { clusterTopicPath } from 'lib/paths'; +import { ClusterGroupParam, clusterTopicPath } from 'lib/paths'; +import { useDeleteConsumerGroupOffsetsMutation } from 'lib/hooks/api/consumers'; +import useAppParams from 'lib/hooks/useAppParams'; +import { Dropdown } from 'components/common/Dropdown'; +import { ActionDropdownItem } from 'components/common/ActionComponent'; import MessageToggleIcon from 'components/common/Icons/MessageToggleIcon'; import IconButtonWrapper from 'components/common/Icons/IconButtonWrapper'; import { TableKeyLink } from 'components/common/table/Table/TableKeyLink.styled'; @@ -18,6 +22,8 @@ interface Props { const ListItem: React.FC = ({ clusterName, name, consumers }) => { const [isOpen, setIsOpen] = React.useState(false); + const consumerProps = useAppParams(); + const deleteOffsetMutation = useDeleteConsumerGroupOffsetsMutation(consumerProps); const getTotalconsumerLag = () => { let count = 0; @@ -27,6 +33,11 @@ const ListItem: React.FC = ({ clusterName, name, consumers }) => { return count; }; + const deleteOffsetHandler = (topicName?: string) => { + if (topicName === undefined) return; + deleteOffsetMutation.mutateAsync(topicName); + }; + return ( <> @@ -41,6 +52,22 @@ const ListItem: React.FC = ({ clusterName, name, consumers }) => { {getTotalconsumerLag()} + + + deleteOffsetHandler(name)} + danger + confirm="Are you sure you want to delete offsets from the topic?" + permission={{ + resource: ResourceType.CONSUMER, + action: Action.RESET_OFFSETS, + value: consumerProps.consumerGroupID, + }} + > + Delete offsets + + + {isOpen && } diff --git a/frontend/src/lib/hooks/api/consumers.ts b/frontend/src/lib/hooks/api/consumers.ts index 85ae048a9..f2373e989 100644 --- a/frontend/src/lib/hooks/api/consumers.ts +++ b/frontend/src/lib/hooks/api/consumers.ts @@ -90,3 +90,30 @@ export const useResetConsumerGroupOffsetsMutation = ({ } ); }; + +export const useDeleteConsumerGroupOffsetsMutation = ({ + clusterName, + consumerGroupID, +}: UseConsumerGroupDetailsProps) => { + const queryClient = useQueryClient(); + return useMutation( + (topicName: string) => + api.deleteConsumerGroupOffsets({ + clusterName, + id: consumerGroupID, + topicName, + }), + { + onSuccess: (_, topicName) => { + showSuccessAlert({ + message: `Consumer ${consumerGroupID} group offsets in topic ${topicName} deleted`, + }); + queryClient.invalidateQueries([ + 'clusters', + clusterName, + 'consumerGroups', + ]); + }, + } + ); +}; From 5ec7b532397370dbf009b1811e27f41e893ea1b2 Mon Sep 17 00:00:00 2001 From: p-eye Date: Tue, 17 Sep 2024 16:46:11 +0900 Subject: [PATCH 2/8] feat: implement deleteConsumerGroupOffsets --- .../controller/ConsumerGroupsController.java | 18 ++++++++++++++++++ .../ui/service/ConsumerGroupService.java | 8 ++++++++ .../ui/service/ReactiveAdminClient.java | 19 +++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 248c6f7d7..b54ba3321 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -16,12 +16,14 @@ import io.kafbat.ui.model.PartitionOffsetDTO; import io.kafbat.ui.model.SortOrderDTO; import io.kafbat.ui.model.rbac.AccessContext; +import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.model.rbac.permission.TopicAction; import io.kafbat.ui.service.ConsumerGroupService; import io.kafbat.ui.service.OffsetsResetService; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -59,6 +61,22 @@ public Mono> deleteConsumerGroup(String clusterName, .thenReturn(ResponseEntity.ok().build()); } + @Override + public Mono> deleteConsumerGroupOffsets(String clusterName, String groupId, String topicName, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .consumerGroupActions(groupId, RESET_OFFSETS) + .topicActions(topicName, TopicAction.VIEW) + .operationName("deleteConsumerGroupOffsets") + .build(); + + return validateAccess(context) + .then(consumerGroupService.deleteConsumerGroupOffset(getCluster(clusterName), groupId, topicName)) + .doOnEach(sig -> audit(context, sig)) + .thenReturn(ResponseEntity.ok().build()); + } + @Override public Mono> getConsumerGroup(String clusterName, String consumerGroupId, diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 452de4a59..c7a3b6d3d 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.function.ToIntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -247,6 +248,13 @@ public Mono deleteConsumerGroupById(KafkaCluster cluster, .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId))); } + public Mono deleteConsumerGroupOffset(KafkaCluster cluster, + String groupId, + String topicName) { + return adminClientService.get(cluster) + .flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName)); + } + public EnhancedConsumer createConsumer(KafkaCluster cluster) { return createConsumer(cluster, Map.of()); } diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index bb04f5527..f16c07f9b 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -49,6 +49,7 @@ import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; @@ -74,6 +75,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.GroupSubscribedToTopicException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -436,6 +438,23 @@ public Mono deleteConsumerGroups(Collection groupIds) { th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); } + public Mono deleteConsumerGroupOffsets(String groupId, String topicName) { + return listConsumerGroupOffsets(List.of(groupId), null) + .flatMap(table -> { + // filter TopicPartitions by topicName + Set partitions = table.row(groupId).keySet().stream() + .filter(tp -> tp.topic().equals(topicName)) + .collect(Collectors.toSet()); + return toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all()); + }) + .onErrorResume(GroupIdNotFoundException.class, + th -> Mono.error(new NotFoundException("The group id does not exist"))) + .onErrorResume(UnknownTopicOrPartitionException.class, + th -> Mono.error(new NotFoundException("The topic or partition is unknown"))) + .onErrorResume(GroupSubscribedToTopicException.class, + th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); + } + public Mono createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, From 9ee820df55204bf26b30827d89990ae99e009dd8 Mon Sep 17 00:00:00 2001 From: p-eye Date: Tue, 17 Sep 2024 16:48:29 +0900 Subject: [PATCH 3/8] fix: Throw NotFoundException when TopicPartition set is empty --- .../main/java/io/kafbat/ui/service/ReactiveAdminClient.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index f16c07f9b..779e29a04 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -445,7 +445,11 @@ public Mono deleteConsumerGroupOffsets(String groupId, String topicName) { Set partitions = table.row(groupId).keySet().stream() .filter(tp -> tp.topic().equals(topicName)) .collect(Collectors.toSet()); - return toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all()); + // check if partitions have no committed offsets + return partitions.isEmpty() + ? Mono.error(new NotFoundException("The topic or partition is unknown")) + // call deleteConsumerGroupOffsets + : toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all()); }) .onErrorResume(GroupIdNotFoundException.class, th -> Mono.error(new NotFoundException("The group id does not exist"))) From 5a57bdf5ccef24217672c108baed7344f7819874 Mon Sep 17 00:00:00 2001 From: p-eye Date: Tue, 17 Sep 2024 17:06:55 +0900 Subject: [PATCH 4/8] tests: add test cases for deleting consumer group offsets --- .../io/kafbat/ui/KafkaConsumerGroupTests.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java index 5f97317f2..3fa600381 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java @@ -11,6 +11,7 @@ import java.util.Properties; import java.util.UUID; import java.util.stream.Stream; +import io.kafbat.ui.producer.KafkaTestProducer; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.apache.commons.lang3.RandomStringUtils; @@ -22,6 +23,8 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Slf4j public class KafkaConsumerGroupTests extends AbstractIntegrationTest { @@ -31,12 +34,76 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest { @Test void shouldNotFoundWhenNoSuchConsumerGroupId() { String groupId = "groupA"; + String topicName = "topicX"; + webTestClient .delete() .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId) .exchange() .expectStatus() .isNotFound(); + + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName) + .exchange() + .expectStatus() + .isNotFound(); + } + + @Test + void shouldNotFoundWhenNoSuchTopic() { + String topicName = createTopicWithRandomName(); + String topicNameUnSubscribed = "topicX"; + + //Create a consumer and subscribe to the topic + String groupId = UUID.randomUUID().toString(); + try (val consumer = createTestConsumerWithGroupId(groupId)) { + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, + topicNameUnSubscribed) + .exchange() + .expectStatus() + .isNotFound(); + } + } + + @Test + void shouldOkWhenConsumerGroupIsNotActiveAndPartitionOffsetExists() { + String topicName = createTopicWithRandomName(); + + //Create a consumer and subscribe to the topic + String groupId = UUID.randomUUID().toString(); + + try (KafkaTestProducer producer = KafkaTestProducer.forKafka(kafka)) { + Flux.fromStream( + Stream.of("one", "two", "three", "four") + .map(value -> Mono.fromFuture(producer.send(topicName, value))) + ).blockLast(); + } catch (Throwable e) { + log.error("Error on sending", e); + throw new RuntimeException(e); + } + + try (val consumer = createTestConsumerWithGroupId(groupId)) { + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + + //Stop consumers to delete consumer offset from the topic + consumer.pause(consumer.assignment()); + } + + //Delete the consumer offset when it's INACTIVE and check + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName) + .exchange() + .expectStatus() + .isOk(); } @Test From 8e7e24c4e27a5d278b28b7ce7c84e156e1e25bb7 Mon Sep 17 00:00:00 2001 From: p-eye Date: Tue, 17 Sep 2024 17:23:17 +0900 Subject: [PATCH 5/8] style: resolve lint issues --- .../test/java/io/kafbat/ui/KafkaConsumerGroupTests.java | 2 +- .../src/components/ConsumerGroups/Details/ListItem.tsx | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java index 3fa600381..b1bf4baa7 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java @@ -4,6 +4,7 @@ import io.kafbat.ui.model.ConsumerGroupDTO; import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO; +import io.kafbat.ui.producer.KafkaTestProducer; import java.io.Closeable; import java.time.Duration; import java.util.Comparator; @@ -11,7 +12,6 @@ import java.util.Properties; import java.util.UUID; import java.util.stream.Stream; -import io.kafbat.ui.producer.KafkaTestProducer; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.apache.commons.lang3.RandomStringUtils; diff --git a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx index 953cc28cf..21560e9ca 100644 --- a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx +++ b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx @@ -1,5 +1,9 @@ import React from 'react'; -import { Action, ConsumerGroupTopicPartition, ResourceType } from 'generated-sources'; +import { + Action, + ConsumerGroupTopicPartition, + ResourceType, +} from 'generated-sources'; import { Link } from 'react-router-dom'; import { ClusterName } from 'lib/interfaces/cluster'; import { ClusterGroupParam, clusterTopicPath } from 'lib/paths'; @@ -23,7 +27,8 @@ interface Props { const ListItem: React.FC = ({ clusterName, name, consumers }) => { const [isOpen, setIsOpen] = React.useState(false); const consumerProps = useAppParams(); - const deleteOffsetMutation = useDeleteConsumerGroupOffsetsMutation(consumerProps); + const deleteOffsetMutation = + useDeleteConsumerGroupOffsetsMutation(consumerProps); const getTotalconsumerLag = () => { let count = 0; From cf7b2e511c992a17bb944e302e652d60c06933d0 Mon Sep 17 00:00:00 2001 From: p-eye Date: Tue, 8 Oct 2024 07:22:01 +0900 Subject: [PATCH 6/8] chore: remove unused imports --- .../java/io/kafbat/ui/controller/ConsumerGroupsController.java | 2 -- .../main/java/io/kafbat/ui/service/ConsumerGroupService.java | 1 - api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java | 1 - 3 files changed, 4 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index b54ba3321..2697c2912 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -16,14 +16,12 @@ import io.kafbat.ui.model.PartitionOffsetDTO; import io.kafbat.ui.model.SortOrderDTO; import io.kafbat.ui.model.rbac.AccessContext; -import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.model.rbac.permission.TopicAction; import io.kafbat.ui.service.ConsumerGroupService; import io.kafbat.ui.service.OffsetsResetService; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index c7a3b6d3d..ca4b13fcd 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.function.ToIntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 779e29a04..651f6d531 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -49,7 +49,6 @@ import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsOptions; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; From 23cd882976f98558e3e6fbfc01044036c4b06043 Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Wed, 9 Oct 2024 11:07:23 +0300 Subject: [PATCH 7/8] Fix checkstyle --- .../ui/controller/ConsumerGroupsController.java | 4 +++- .../io/kafbat/ui/service/ConsumerGroupService.java | 12 ++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 2697c2912..32337bbcf 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -60,7 +60,9 @@ public Mono> deleteConsumerGroup(String clusterName, } @Override - public Mono> deleteConsumerGroupOffsets(String clusterName, String groupId, String topicName, + public Mono> deleteConsumerGroupOffsets(String clusterName, + String groupId, + String topicName, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index ca4b13fcd..939fc3e25 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -210,11 +210,11 @@ private Mono> describeConsumerGroups(ReactiveAdmi private Mono> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac, - List groups, - Comparator comparator, - int pageNum, - int perPage, - SortOrderDTO sortOrderDto) { + List groups, + Comparator comparator, + int pageNum, + int perPage, + SortOrderDTO sortOrderDto) { var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList(); return ac.describeConsumerGroups(groupNames) @@ -248,7 +248,7 @@ public Mono deleteConsumerGroupById(KafkaCluster cluster, } public Mono deleteConsumerGroupOffset(KafkaCluster cluster, - String groupId, + String groupId, String topicName) { return adminClientService.get(cluster) .flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName)); From b6b0775ebb40b6dac622fe434923d491950b17bf Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Wed, 9 Oct 2024 11:10:52 +0300 Subject: [PATCH 8/8] Fix checkstyle --- .../io/kafbat/ui/service/ConsumerGroupService.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 939fc3e25..27593cd6f 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -209,12 +209,13 @@ private Mono> describeConsumerGroups(ReactiveAdmi } - private Mono> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac, - List groups, - Comparator comparator, - int pageNum, - int perPage, - SortOrderDTO sortOrderDto) { + private Mono> loadDescriptionsByInternalConsumerGroups( + ReactiveAdminClient ac, + List groups, + Comparator comparator, + int pageNum, + int perPage, + SortOrderDTO sortOrderDto) { var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList(); return ac.describeConsumerGroups(groupNames)