Skip to content

Commit

Permalink
opensearchapi: add reindex rethrottle function to rootClient
Browse files Browse the repository at this point in the history
Signed-off-by: Jakob Hahn <jakob.hahn@hetzner.com>
  • Loading branch information
Jakob3xD committed Oct 11, 2023
1 parent 8f03d18 commit d14a96f
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
57 changes: 57 additions & 0 deletions opensearchapi/api_reindex_rethrottle-params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opensearchapi

import (
"strconv"
)

// ReindexRethrottleParams represents possible parameters for the ReindexRethrottleReq
type ReindexRethrottleParams struct {
RequestsPerSecond *int

Pretty bool
Human bool
ErrorTrace bool
}

func (r ReindexRethrottleParams) get() map[string]string {
params := make(map[string]string)

if r.RequestsPerSecond != nil {
params["requests_per_second"] = strconv.FormatInt(int64(*r.RequestsPerSecond), 10)
}

if r.Pretty {
params["pretty"] = "true"
}

if r.Human {
params["human"] = "true"
}

if r.ErrorTrace {
params["error_trace"] = "true"
}

return params
}
131 changes: 131 additions & 0 deletions opensearchapi/api_reindex_rethrottle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opensearchapi

import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/opensearch-project/opensearch-go/v2"
)

// ReindexRethrottle executes a / request with the optional ReindexRethrottleReq
func (c Client) ReindexRethrottle(ctx context.Context, req ReindexRethrottleReq) (*ReindexRethrottleResp, error) {
var (
data ReindexRethrottleResp
err error
)
if data.response, err = c.do(ctx, req, &data); err != nil {
return &data, err
}

return &data, nil
}

// ReindexRethrottleReq represents possible options for the / request
type ReindexRethrottleReq struct {
TaskID string

Header http.Header
Params ReindexRethrottleParams
}

// GetRequest returns the *http.Request that gets executed by the client
func (r ReindexRethrottleReq) GetRequest() (*http.Request, error) {
return opensearch.BuildRequest(
"POST",
fmt.Sprintf("/_reindex/%s/_rethrottle", r.TaskID),
nil,
r.Params.get(),
r.Header,
)
}

// ReindexRethrottleResp represents the returned struct of the / response
type ReindexRethrottleResp struct {
Nodes map[string]struct {
Name string `json:"name"`
TransportAddress string `json:"transport_address"`
Host string `json:"host"`
IP string `json:"ip"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Tasks map[string]struct {
Node string `json:"node"`
ID int `json:"id"`
Type string `json:"type"`
Action string `json:"action"`
Status struct {
Total int `json:"total"`
Updated int `json:"updated"`
Created int `json:"created"`
Deleted int `json:"deleted"`
Batches int `json:"batches"`
VersionConflicts int `json:"version_conflicts"`
Noops int `json:"noops"`
Retries struct {
Bulk int `json:"bulk"`
Search int `json:"search"`
} `json:"retries"`
ThrottledMillis int `json:"throttled_millis"`
RequestsPerSecond float64 `json:"requests_per_second"`
ThrottledUntilMillis int `json:"throttled_until_millis"`
} `json:"status"`
Description string `json:"description"`
StartTimeInMillis int64 `json:"start_time_in_millis"`
RunningTimeInNanos int `json:"running_time_in_nanos"`
Cancellable bool `json:"cancellable"`
Cancelled bool `json:"cancelled"`
Headers json.RawMessage `json:"headers"`
ResourceStats struct {
Average struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"average"`
Total struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"total"`
Min struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"min"`
Max struct {
CPUTimeInNanos int `json:"cpu_time_in_nanos"`
MemoryInBytes int `json:"memory_in_bytes"`
} `json:"max"`
ThreadInfo struct {
ThreadExecutions int `json:"thread_executions"`
ActiveThreads int `json:"active_threads"`
} `json:"thread_info"`
} `json:"resource_stats"`
} `json:"tasks"`
} `json:"nodes"`
response *opensearch.Response
}

// Inspect returns the Inspect type containing the raw *opensearch.Reponse
func (r ReindexRethrottleResp) Inspect() Inspect {
return Inspect{Response: r.response}
}
109 changes: 109 additions & 0 deletions opensearchapi/api_reindex_rethrottle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.
//
// Modifications Copyright OpenSearch Contributors. See
// GitHub history for details.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//go:build integration

package opensearchapi_test

import (
"context"
"fmt"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
osapitest "github.com/opensearch-project/opensearch-go/v2/opensearchapi/internal/test"
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
)

func TestReindexRethrottle(t *testing.T) {
client, err := opensearchapi.NewDefaultClient()
require.Nil(t, err)

sourceIndex := "test-reindex-rethrottle-source"
destIndex := "test-reindex-rethrottle-dest"
t.Cleanup(func() {
client.Indices.Delete(
nil,
opensearchapi.IndicesDeleteReq{
Indices: []string{sourceIndex, destIndex},
Params: opensearchapi.IndicesDeleteParams{IgnoreUnavailable: opensearchapi.ToPointer(true)},
},
)
})

bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: sourceIndex,
Client: client,
ErrorTrace: true,
Human: true,
Pretty: true,
})
for i := 1; i <= 10000; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Index: sourceIndex,
Action: "index",
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(`{"title":"bar"}`),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}
if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}

reindex, err := client.Reindex(
nil,
opensearchapi.ReindexReq{
Body: strings.NewReader(fmt.Sprintf(`{"source":{"index":"%s"},"dest":{"index":"%s"}}`, sourceIndex, destIndex)),
Params: opensearchapi.ReindexParams{WaitForCompletion: opensearchapi.ToPointer(false)},
},
)
require.Nil(t, err)
t.Run("with request", func(t *testing.T) {
resp, err := client.ReindexRethrottle(
nil,
opensearchapi.ReindexRethrottleReq{
TaskID: reindex.Task,
Params: opensearchapi.ReindexRethrottleParams{RequestsPerSecond: opensearchapi.ToPointer(40)},
},
)
require.Nil(t, err)
assert.NotEmpty(t, resp)
osapitest.CompareRawJSONwithParsedJSON(t, resp, resp.Inspect().Response)
})

t.Run("inspect", func(t *testing.T) {
failingClient, err := osapitest.CreateFailingClient()
require.Nil(t, err)

res, err := failingClient.ReindexRethrottle(nil, opensearchapi.ReindexRethrottleReq{})
assert.NotNil(t, err)
assert.NotNil(t, res)
osapitest.VerifyInspect(t, res.Inspect())
})
}

0 comments on commit d14a96f

Please sign in to comment.