Skip to content

Commit

Permalink
Adding resolveConcreteIndexNames to DataStreamsActionUtil (elastic#…
Browse files Browse the repository at this point in the history
…106621)

Extract method `resolveConcreteIndexNames` from
`DataStreamsStatsTransportAction` to `DataStreamsActionUtil`, and move
the whole class to the `server` module, so it can be shared and used
across plugins.
  • Loading branch information
ldematte authored Mar 21, 2024
1 parent ff91ce0 commit a960bef
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.datastreams.DataStreamsStatsAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
Expand Down Expand Up @@ -44,7 +45,6 @@
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Stream;

public class DataStreamsStatsTransportAction extends TransportBroadcastByNodeAction<
DataStreamsStatsAction.Request,
Expand Down Expand Up @@ -104,25 +104,12 @@ protected ClusterBlockException checkRequestBlock(

@Override
protected String[] resolveConcreteIndexNames(ClusterState clusterState, DataStreamsStatsAction.Request request) {
List<String> abstractionNames = indexNameExpressionResolver.dataStreamNames(
return DataStreamsActionUtil.resolveConcreteIndexNames(
indexNameExpressionResolver,
clusterState,
request.indicesOptions(),
request.indices()
);
SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getIndicesLookup();

String[] concreteDatastreamIndices = abstractionNames.stream().flatMap(abstractionName -> {
IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName);
assert indexAbstraction != null;
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
DataStream dataStream = (DataStream) indexAbstraction;
List<Index> indices = dataStream.getIndices();
return indices.stream().map(Index::getName);
} else {
return Stream.empty();
}
}).toArray(String[]::new);
return concreteDatastreamIndices;
request.indices(),
request.indicesOptions()
).toArray(String[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import java.util.Set;
import java.util.function.Consumer;

import static org.elasticsearch.datastreams.action.DataStreamsActionUtil.getDataStreamNames;
import static org.elasticsearch.action.datastreams.DataStreamsActionUtil.getDataStreamNames;

public class DeleteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction<DeleteDataStreamAction.Request> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.IndexProperties;
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.ManagedBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.datastreams.lifecycle.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
Expand All @@ -19,7 +20,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.datastreams.action.DataStreamsActionUtil;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.datastreams.lifecycle.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -21,7 +22,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.datastreams.action.DataStreamsActionUtil;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.datastreams.lifecycle.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
Expand All @@ -19,7 +20,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.datastreams.action.DataStreamsActionUtil;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams.action;
package org.elasticsearch.action.datastreams;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.index.Index;

import java.util.List;
import java.util.SortedMap;
import java.util.stream.Stream;

public class DataStreamsActionUtil {

Expand Down Expand Up @@ -40,4 +45,26 @@ public static IndicesOptions updateIndicesOptions(IndicesOptions indicesOptions)
}
return indicesOptions;
}

public static Stream<String> resolveConcreteIndexNames(
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterState clusterState,
String[] names,
IndicesOptions indicesOptions
) {
List<String> abstractionNames = getDataStreamNames(indexNameExpressionResolver, clusterState, names, indicesOptions);
SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getIndicesLookup();

return abstractionNames.stream().flatMap(abstractionName -> {
IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName);
assert indexAbstraction != null;
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
DataStream dataStream = (DataStream) indexAbstraction;
List<Index> indices = dataStream.getIndices();
return indices.stream().map(Index::getName);
} else {
return Stream.empty();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.datastreams;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamMetadata;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DataStreamsActionUtilTests extends ESTestCase {

public void testDataStreamsResolveConcreteIndexNames() {

var index1 = new Index("foo1", IndexMetadata.INDEX_UUID_NA_VALUE);
var index3 = new Index("bar", IndexMetadata.INDEX_UUID_NA_VALUE);

var dataStreamIndex1 = new Index(".ds-foo1", IndexMetadata.INDEX_UUID_NA_VALUE);
var dataStreamIndex2 = new Index(".ds-bar2", IndexMetadata.INDEX_UUID_NA_VALUE);
var dataStreamIndex3 = new Index(".ds-foo2", IndexMetadata.INDEX_UUID_NA_VALUE);
var dataStreamIndex4 = new Index(".ds-baz1", IndexMetadata.INDEX_UUID_NA_VALUE);

ClusterState clusterState = ClusterState.builder(new ClusterName("test-cluster"))
.metadata(
Metadata.builder()
.putCustom(
DataStreamMetadata.TYPE,
new DataStreamMetadata(
ImmutableOpenMap.<String, DataStream>builder()
.fPut("fooDs", DataStreamTestHelper.newInstance("fooDs", List.of(dataStreamIndex1)))
.fPut("barDs", DataStreamTestHelper.newInstance("barDs", List.of(dataStreamIndex2)))
.fPut("foo2Ds", DataStreamTestHelper.newInstance("foo2Ds", List.of(dataStreamIndex3)))
.fPut("bazDs", DataStreamTestHelper.newInstance("bazDs", List.of(dataStreamIndex4)))
.build(),
ImmutableOpenMap.of()
)
)
.indices(
createLocalOnlyIndicesMetadata(
index1,
index3,
dataStreamIndex1,
dataStreamIndex2,
dataStreamIndex3,
dataStreamIndex4
)
)
.build()
)
.build();

var query = new String[] { "foo*", "baz*" };
var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(indexNameExpressionResolver.dataStreamNames(any(), any(), eq(query))).thenReturn(List.of("fooDs", "foo2Ds", "bazDs"));

var resolved = DataStreamsActionUtil.resolveConcreteIndexNames(
indexNameExpressionResolver,
clusterState,
query,
IndicesOptions.builder().wildcardOptions(IndicesOptions.WildcardOptions.builder().includeHidden(true)).build()
).toList();

assertThat(resolved, containsInAnyOrder(".ds-foo1", ".ds-foo2", ".ds-baz1"));
}

private Map<String, IndexMetadata> createLocalOnlyIndicesMetadata(Index... indices) {
return Arrays.stream(indices)
.map(
index1 -> Map.entry(
index1.getName(),
IndexMetadata.builder(index1.getName())
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()))
.numberOfReplicas(0)
.numberOfShards(1)
.build()
)
)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

}

0 comments on commit a960bef

Please sign in to comment.