diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 248c6f7d7..32337bbcf 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -59,6 +59,24 @@ public Mono> deleteConsumerGroup(String clusterName, .thenReturn(ResponseEntity.ok().build()); } + @Override + public Mono> deleteConsumerGroupOffsets(String clusterName, + String groupId, + String topicName, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .consumerGroupActions(groupId, RESET_OFFSETS) + .topicActions(topicName, TopicAction.VIEW) + .operationName("deleteConsumerGroupOffsets") + .build(); + + return validateAccess(context) + .then(consumerGroupService.deleteConsumerGroupOffset(getCluster(clusterName), groupId, topicName)) + .doOnEach(sig -> audit(context, sig)) + .thenReturn(ResponseEntity.ok().build()); + } + @Override public Mono> getConsumerGroup(String clusterName, String consumerGroupId, diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 452de4a59..27593cd6f 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -209,12 +209,13 @@ private Mono> describeConsumerGroups(ReactiveAdmi } - private Mono> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac, - List groups, - Comparator comparator, - int pageNum, - int perPage, - SortOrderDTO sortOrderDto) { + private Mono> loadDescriptionsByInternalConsumerGroups( + ReactiveAdminClient ac, + List groups, + Comparator comparator, + int pageNum, + int perPage, + SortOrderDTO sortOrderDto) { var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList(); return ac.describeConsumerGroups(groupNames) @@ -247,6 +248,13 @@ public Mono deleteConsumerGroupById(KafkaCluster cluster, .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId))); } + public Mono deleteConsumerGroupOffset(KafkaCluster cluster, + String groupId, + String topicName) { + return adminClientService.get(cluster) + .flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName)); + } + public EnhancedConsumer createConsumer(KafkaCluster cluster) { return createConsumer(cluster, Map.of()); } diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index bb04f5527..651f6d531 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -74,6 +74,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.GroupSubscribedToTopicException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -436,6 +437,27 @@ public Mono deleteConsumerGroups(Collection groupIds) { th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); } + public Mono deleteConsumerGroupOffsets(String groupId, String topicName) { + return listConsumerGroupOffsets(List.of(groupId), null) + .flatMap(table -> { + // filter TopicPartitions by topicName + Set partitions = table.row(groupId).keySet().stream() + .filter(tp -> tp.topic().equals(topicName)) + .collect(Collectors.toSet()); + // check if partitions have no committed offsets + return partitions.isEmpty() + ? Mono.error(new NotFoundException("The topic or partition is unknown")) + // call deleteConsumerGroupOffsets + : toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all()); + }) + .onErrorResume(GroupIdNotFoundException.class, + th -> Mono.error(new NotFoundException("The group id does not exist"))) + .onErrorResume(UnknownTopicOrPartitionException.class, + th -> Mono.error(new NotFoundException("The topic or partition is unknown"))) + .onErrorResume(GroupSubscribedToTopicException.class, + th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); + } + public Mono createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, diff --git a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java index 5f97317f2..b1bf4baa7 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java @@ -4,6 +4,7 @@ import io.kafbat.ui.model.ConsumerGroupDTO; import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO; +import io.kafbat.ui.producer.KafkaTestProducer; import java.io.Closeable; import java.time.Duration; import java.util.Comparator; @@ -22,6 +23,8 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Slf4j public class KafkaConsumerGroupTests extends AbstractIntegrationTest { @@ -31,12 +34,76 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest { @Test void shouldNotFoundWhenNoSuchConsumerGroupId() { String groupId = "groupA"; + String topicName = "topicX"; + webTestClient .delete() .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId) .exchange() .expectStatus() .isNotFound(); + + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName) + .exchange() + .expectStatus() + .isNotFound(); + } + + @Test + void shouldNotFoundWhenNoSuchTopic() { + String topicName = createTopicWithRandomName(); + String topicNameUnSubscribed = "topicX"; + + //Create a consumer and subscribe to the topic + String groupId = UUID.randomUUID().toString(); + try (val consumer = createTestConsumerWithGroupId(groupId)) { + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, + topicNameUnSubscribed) + .exchange() + .expectStatus() + .isNotFound(); + } + } + + @Test + void shouldOkWhenConsumerGroupIsNotActiveAndPartitionOffsetExists() { + String topicName = createTopicWithRandomName(); + + //Create a consumer and subscribe to the topic + String groupId = UUID.randomUUID().toString(); + + try (KafkaTestProducer producer = KafkaTestProducer.forKafka(kafka)) { + Flux.fromStream( + Stream.of("one", "two", "three", "four") + .map(value -> Mono.fromFuture(producer.send(topicName, value))) + ).blockLast(); + } catch (Throwable e) { + log.error("Error on sending", e); + throw new RuntimeException(e); + } + + try (val consumer = createTestConsumerWithGroupId(groupId)) { + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + + //Stop consumers to delete consumer offset from the topic + consumer.pause(consumer.assignment()); + } + + //Delete the consumer offset when it's INACTIVE and check + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName) + .exchange() + .expectStatus() + .isOk(); } @Test diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 7ca62831f..5eede6cef 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1048,6 +1048,32 @@ paths: 200: description: OK + /api/clusters/{clusterName}/consumer-groups/{id}/topics/{topicName}: + delete: + tags: + - Consumer Groups + summary: delete consumer group offsets + operationId: deleteConsumerGroupOffsets + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + /api/clusters/{clusterName}/schemas: post: tags: diff --git a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx index 57b7cb133..21560e9ca 100644 --- a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx +++ b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx @@ -1,8 +1,16 @@ import React from 'react'; -import { ConsumerGroupTopicPartition } from 'generated-sources'; +import { + Action, + ConsumerGroupTopicPartition, + ResourceType, +} from 'generated-sources'; import { Link } from 'react-router-dom'; import { ClusterName } from 'lib/interfaces/cluster'; -import { clusterTopicPath } from 'lib/paths'; +import { ClusterGroupParam, clusterTopicPath } from 'lib/paths'; +import { useDeleteConsumerGroupOffsetsMutation } from 'lib/hooks/api/consumers'; +import useAppParams from 'lib/hooks/useAppParams'; +import { Dropdown } from 'components/common/Dropdown'; +import { ActionDropdownItem } from 'components/common/ActionComponent'; import MessageToggleIcon from 'components/common/Icons/MessageToggleIcon'; import IconButtonWrapper from 'components/common/Icons/IconButtonWrapper'; import { TableKeyLink } from 'components/common/table/Table/TableKeyLink.styled'; @@ -18,6 +26,9 @@ interface Props { const ListItem: React.FC = ({ clusterName, name, consumers }) => { const [isOpen, setIsOpen] = React.useState(false); + const consumerProps = useAppParams(); + const deleteOffsetMutation = + useDeleteConsumerGroupOffsetsMutation(consumerProps); const getTotalconsumerLag = () => { let count = 0; @@ -27,6 +38,11 @@ const ListItem: React.FC = ({ clusterName, name, consumers }) => { return count; }; + const deleteOffsetHandler = (topicName?: string) => { + if (topicName === undefined) return; + deleteOffsetMutation.mutateAsync(topicName); + }; + return ( <> @@ -41,6 +57,22 @@ const ListItem: React.FC = ({ clusterName, name, consumers }) => { {getTotalconsumerLag()} + + + deleteOffsetHandler(name)} + danger + confirm="Are you sure you want to delete offsets from the topic?" + permission={{ + resource: ResourceType.CONSUMER, + action: Action.RESET_OFFSETS, + value: consumerProps.consumerGroupID, + }} + > + Delete offsets + + + {isOpen && } diff --git a/frontend/src/lib/hooks/api/consumers.ts b/frontend/src/lib/hooks/api/consumers.ts index 85ae048a9..f2373e989 100644 --- a/frontend/src/lib/hooks/api/consumers.ts +++ b/frontend/src/lib/hooks/api/consumers.ts @@ -90,3 +90,30 @@ export const useResetConsumerGroupOffsetsMutation = ({ } ); }; + +export const useDeleteConsumerGroupOffsetsMutation = ({ + clusterName, + consumerGroupID, +}: UseConsumerGroupDetailsProps) => { + const queryClient = useQueryClient(); + return useMutation( + (topicName: string) => + api.deleteConsumerGroupOffsets({ + clusterName, + id: consumerGroupID, + topicName, + }), + { + onSuccess: (_, topicName) => { + showSuccessAlert({ + message: `Consumer ${consumerGroupID} group offsets in topic ${topicName} deleted`, + }); + queryClient.invalidateQueries([ + 'clusters', + clusterName, + 'consumerGroups', + ]); + }, + } + ); +};