From 971cb22874b7f6cd0a3a32a2a61df876d3bf3c22 Mon Sep 17 00:00:00 2001 From: Jakob Hahn Date: Wed, 11 Oct 2023 10:08:12 +0200 Subject: [PATCH] opensearchapi: add reindex rethrottle function to rootClient Signed-off-by: Jakob Hahn --- .../api_reindex_rethrottle-params.go | 57 ++++++++ opensearchapi/api_reindex_rethrottle.go | 133 ++++++++++++++++++ opensearchapi/api_reindex_rethrottle_test.go | 103 ++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 opensearchapi/api_reindex_rethrottle-params.go create mode 100644 opensearchapi/api_reindex_rethrottle.go create mode 100644 opensearchapi/api_reindex_rethrottle_test.go diff --git a/opensearchapi/api_reindex_rethrottle-params.go b/opensearchapi/api_reindex_rethrottle-params.go new file mode 100644 index 000000000..502f3af11 --- /dev/null +++ b/opensearchapi/api_reindex_rethrottle-params.go @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. +// +// Modifications Copyright OpenSearch Contributors. See +// GitHub history for details. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opensearchapi + +import ( + "strconv" +) + +// ReindexRethrottleParams represents possible parameters for the ReindexRethrottleReq +type ReindexRethrottleParams struct { + RequestsPerSecond *int + + Pretty bool + Human bool + ErrorTrace bool +} + +func (r ReindexRethrottleParams) get() map[string]string { + params := make(map[string]string) + + if r.RequestsPerSecond != nil { + params["requests_per_second"] = strconv.FormatInt(int64(*r.RequestsPerSecond), 10) + } + + if r.Pretty { + params["pretty"] = "true" + } + + if r.Human { + params["human"] = "true" + } + + if r.ErrorTrace { + params["error_trace"] = "true" + } + + return params +} diff --git a/opensearchapi/api_reindex_rethrottle.go b/opensearchapi/api_reindex_rethrottle.go new file mode 100644 index 000000000..69ce00e05 --- /dev/null +++ b/opensearchapi/api_reindex_rethrottle.go @@ -0,0 +1,133 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. +// +// Modifications Copyright OpenSearch Contributors. See +// GitHub history for details. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opensearchapi + +import ( + "context" + "fmt" + "net/http" + + "github.com/opensearch-project/opensearch-go/v2" +) + +// ReindexRethrottle executes a / request with the optional ReindexRethrottleReq +func (c Client) ReindexRethrottle(ctx context.Context, req ReindexRethrottleReq) (*ReindexRethrottleResp, error) { + var ( + data ReindexRethrottleResp + err error + ) + if data.response, err = c.do(ctx, req, &data); err != nil { + return &data, err + } + + return &data, nil +} + +// ReindexRethrottleReq represents possible options for the / request +type ReindexRethrottleReq struct { + TaskID string + + Header http.Header + Params ReindexRethrottleParams +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r ReindexRethrottleReq) GetRequest() (*http.Request, error) { + return opensearch.BuildRequest( + "POST", + fmt.Sprintf("/_reindex/%s/_rethrottle", r.TaskID), + nil, + r.Params.get(), + r.Header, + ) +} + +// ReindexRethrottleResp represents the returned struct of the / response +type ReindexRethrottleResp struct { + Nodes map[string]struct { + Name string `json:"name"` + TransportAddress string `json:"transport_address"` + Host string `json:"host"` + IP string `json:"ip"` + Roles []string `json:"roles"` + Attributes struct { + ShardIndexingPressureEnabled string `json:"shard_indexing_pressure_enabled"` + } `json:"attributes"` + Tasks map[string]struct { + Node string `json:"node"` + ID int `json:"id"` + Type string `json:"type"` + Action string `json:"action"` + Status struct { + Total int `json:"total"` + Updated int `json:"updated"` + Created int `json:"created"` + Deleted int `json:"deleted"` + Batches int `json:"batches"` + VersionConflicts int `json:"version_conflicts"` + Noops int `json:"noops"` + Retries struct { + Bulk int `json:"bulk"` + Search int `json:"search"` + } `json:"retries"` + ThrottledMillis int `json:"throttled_millis"` + RequestsPerSecond float64 `json:"requests_per_second"` + ThrottledUntilMillis int `json:"throttled_until_millis"` + } `json:"status"` + Description string `json:"description"` + StartTimeInMillis int64 `json:"start_time_in_millis"` + RunningTimeInNanos int `json:"running_time_in_nanos"` + Cancellable bool `json:"cancellable"` + Cancelled bool `json:"cancelled"` + Headers struct { + } `json:"headers"` + ResourceStats struct { + Average struct { + CPUTimeInNanos int `json:"cpu_time_in_nanos"` + MemoryInBytes int `json:"memory_in_bytes"` + } `json:"average"` + Total struct { + CPUTimeInNanos int `json:"cpu_time_in_nanos"` + MemoryInBytes int `json:"memory_in_bytes"` + } `json:"total"` + Min struct { + CPUTimeInNanos int `json:"cpu_time_in_nanos"` + MemoryInBytes int `json:"memory_in_bytes"` + } `json:"min"` + Max struct { + CPUTimeInNanos int `json:"cpu_time_in_nanos"` + MemoryInBytes int `json:"memory_in_bytes"` + } `json:"max"` + ThreadInfo struct { + ThreadExecutions int `json:"thread_executions"` + ActiveThreads int `json:"active_threads"` + } `json:"thread_info"` + } `json:"resource_stats"` + } `json:"tasks"` + } `json:"nodes"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r ReindexRethrottleResp) Inspect() Inspect { + return Inspect{Response: r.response} +} diff --git a/opensearchapi/api_reindex_rethrottle_test.go b/opensearchapi/api_reindex_rethrottle_test.go new file mode 100644 index 000000000..97cb1af15 --- /dev/null +++ b/opensearchapi/api_reindex_rethrottle_test.go @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. +// +// Modifications Copyright OpenSearch Contributors. See +// GitHub history for details. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//go:build integration + +package opensearchapi_test + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" + osapitest "github.com/opensearch-project/opensearch-go/v2/opensearchapi/internal/test" + "github.com/opensearch-project/opensearch-go/v2/opensearchutil" +) + +func TestReindexRethrottle(t *testing.T) { + client, err := opensearchapi.NewDefaultClient() + require.Nil(t, err) + + sourceIndex := "test-reindex-rethrottle-source" + destIndex := "test-reindex-rethrottle-dest" + t.Cleanup(func() { + client.Indices.Delete(nil, opensearchapi.IndicesDeleteReq{Indices: []string{sourceIndex, destIndex}}) + }) + + bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + Index: sourceIndex, + Client: client, + ErrorTrace: true, + Human: true, + Pretty: true, + }) + for i := 1; i <= 10000; i++ { + err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ + Index: sourceIndex, + Action: "index", + DocumentID: strconv.Itoa(i), + Body: strings.NewReader(`{"title":"bar"}`), + }) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + } + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } + + reindex, err := client.Reindex( + nil, + opensearchapi.ReindexReq{ + Body: strings.NewReader(fmt.Sprintf(`{"source":{"index":"%s"},"dest":{"index":"%s"}}`, sourceIndex, destIndex)), + Params: opensearchapi.ReindexParams{WaitForCompletion: opensearchapi.ToPointer(false)}, + }, + ) + require.Nil(t, err) + t.Run("with request", func(t *testing.T) { + resp, err := client.ReindexRethrottle( + nil, + opensearchapi.ReindexRethrottleReq{ + TaskID: reindex.Task, + Params: opensearchapi.ReindexRethrottleParams{RequestsPerSecond: opensearchapi.ToPointer(40)}, + }, + ) + require.Nil(t, err) + assert.NotEmpty(t, resp) + osapitest.CompareRawJSONwithParsedJSON(t, resp, resp.Inspect().Response) + }) + + t.Run("inspect", func(t *testing.T) { + failingClient, err := osapitest.CreateFailingClient() + require.Nil(t, err) + + res, err := failingClient.ReindexRethrottle(nil, opensearchapi.ReindexRethrottleReq{}) + assert.NotNil(t, err) + assert.NotNil(t, res) + osapitest.VerifyInspect(t, res.Inspect()) + }) +}