diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 586c0ce05a1..213bd67d3b5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -154,6 +154,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294] - Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720] - rabbitmq/queue - Change the mapping type of `rabbitmq.queue.consumers.utilisation.pct` to `scaled_float` from `long` because the values fall within the range of `[0.0, 1.0]`. Previously, conversion to integer resulted in reporting either `0` or `1`. +- Fix timeout caused by the retrival of which indices are hidden {pull}39165[39165] *Osquerybeat* diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index 0bf7aa5b532..b77691cad26 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/url" - "strconv" "strings" "sync" "time" @@ -368,61 +367,6 @@ func GetXPack(http *helper.HTTP, resetURI string) (XPack, error) { return xpack, err } -type boolStr bool - -func (b *boolStr) UnmarshalJSON(raw []byte) error { - var bs string - err := json.Unmarshal(raw, &bs) - if err != nil { - return err - } - - bv, err := strconv.ParseBool(bs) - if err != nil { - return err - } - - *b = boolStr(bv) - return nil -} - -type IndexSettings struct { - Hidden bool -} - -// GetIndicesSettings returns a map of index names to their settings. -// Note that as of now it is optimized to fetch only the "hidden" index setting to keep the memory -// footprint of this function call as low as possible. -func GetIndicesSettings(http *helper.HTTP, resetURI string) (map[string]IndexSettings, error) { - content, err := fetchPath(http, resetURI, "*/_settings", "filter_path=*.settings.index.hidden&expand_wildcards=all") - - if err != nil { - return nil, fmt.Errorf("could not fetch indices settings: %w", err) - } - - var resp map[string]struct { - Settings struct { - Index struct { - Hidden boolStr `json:"hidden"` - } `json:"index"` - } `json:"settings"` - } - - err = json.Unmarshal(content, &resp) - if err != nil { - return nil, fmt.Errorf("could not parse indices settings response: %w", err) - } - - ret := make(map[string]IndexSettings, len(resp)) - for index, settings := range resp { - ret[index] = IndexSettings{ - Hidden: bool(settings.Settings.Index.Hidden), - } - } - - return ret, nil -} - // IsMLockAllEnabled returns if the given Elasticsearch node has mlockall enabled func IsMLockAllEnabled(http *helper.HTTP, resetURI, nodeID string) (bool, error) { content, err := fetchPath(http, resetURI, "_nodes/"+nodeID, "filter_path=nodes.*.process.mlockall") diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 3d102a2fd62..4a7ddc978c4 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -21,12 +21,14 @@ package elasticsearch_test import ( "bytes" + "context" "encoding/json" "errors" "fmt" - "io/ioutil" + "io" "math/rand" "net/http" + "os" "strings" "testing" "time" @@ -137,19 +139,11 @@ func TestGetAllIndices(t *testing.T) { name, ok := event.MetricSetFields["name"] require.True(t, ok) - hidden, ok := event.MetricSetFields["hidden"] - require.True(t, ok) - - isHidden, ok := hidden.(bool) - require.True(t, ok) - switch name { case indexVisible: idxVisibleExists = true - require.False(t, isHidden) case indexHidden: idxHiddenExists = true - require.True(t, isHidden) } } @@ -202,7 +196,7 @@ func createIndex(host string, isHidden bool) (string, error) { reqBody := fmt.Sprintf(`{ "settings": { "index.hidden": %v } }`, isHidden) - req, err := http.NewRequest("PUT", fmt.Sprintf("http://%v/%v", host, indexName), strings.NewReader(reqBody)) + req, err := http.NewRequestWithContext(context.Background(), "PUT", fmt.Sprintf("http://%v/%v", host, indexName), strings.NewReader(reqBody)) if err != nil { return "", fmt.Errorf("could not build create index request: %w", err) } @@ -214,7 +208,7 @@ func createIndex(host string, isHidden bool) (string, error) { return "", fmt.Errorf("could not send create index request: %w", err) } defer resp.Body.Close() - respBody, err := ioutil.ReadAll(resp.Body) + respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode != 200 { return "", fmt.Errorf("HTTP error %d: %s, %s", resp.StatusCode, resp.Status, string(respBody)) @@ -242,7 +236,7 @@ func enableTrialLicense(host string, version *version.V) error { enableXPackURL = "/_license/start_trial?acknowledge=true" } - req, err := http.NewRequest("POST", "http://"+host+enableXPackURL, nil) + req, err := http.NewRequestWithContext(context.Background(), "POST", "http://"+host+enableXPackURL, nil) if err != nil { return err } @@ -254,7 +248,7 @@ func enableTrialLicense(host string, version *version.V) error { defer resp.Body.Close() if resp.StatusCode != 200 { - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return err } @@ -273,13 +267,19 @@ func checkTrialLicenseEnabled(host string, version *version.V) (bool, error) { licenseURL = "/_license" } - resp, err := http.Get("http://" + host + licenseURL) + req, err := http.NewRequestWithContext(context.Background(), "GET", "http://"+host+licenseURL, nil) + if err != nil { + return false, err + } + + client := &http.Client{} + resp, err := client.Do(req) if err != nil { return false, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return false, err } @@ -302,7 +302,7 @@ func checkTrialLicenseEnabled(host string, version *version.V) (bool, error) { func createMLJob(host string, version *version.V) error { - mlJob, err := ioutil.ReadFile("ml_job/_meta/test/test_job.json") + mlJob, err := os.ReadFile("ml_job/_meta/test/test_job.json") if err != nil { return err } @@ -323,6 +323,7 @@ func createMLJob(host string, version *version.V) error { return fmt.Errorf("error doing PUT request when creating ML job: %w", err) } + defer resp.Body.Close() if resp.StatusCode != 200 { return fmt.Errorf("HTTP error loading ml job %d: %s, %s", resp.StatusCode, resp.Status, string(body)) } @@ -364,13 +365,19 @@ func createCCRStats(host string) error { } func checkCCRStatsExists(host string) (bool, error) { - resp, err := http.Get("http://" + host + "/_ccr/stats") + req, err := http.NewRequestWithContext(context.Background(), "GET", "http://"+host+"/_ccr/stats", nil) + if err != nil { + return false, err + } + + client := &http.Client{} + resp, err := client.Do(req) if err != nil { return false, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return false, err } @@ -389,50 +396,58 @@ func checkCCRStatsExists(host string) (bool, error) { } func setupCCRRemote(host string) error { - remoteSettings, err := ioutil.ReadFile("ccr/_meta/test/test_remote_settings.json") + remoteSettings, err := os.ReadFile("ccr/_meta/test/test_remote_settings.json") if err != nil { return err } settingsURL := "/_cluster/settings" - _, _, err = httpPutJSON(host, settingsURL, remoteSettings) + _, resp, err := httpPutJSON(host, settingsURL, remoteSettings) + defer resp.Body.Close() return err } func createCCRLeaderIndex(host string) error { - leaderIndex, err := ioutil.ReadFile("ccr/_meta/test/test_leader_index.json") + leaderIndex, err := os.ReadFile("ccr/_meta/test/test_leader_index.json") if err != nil { return err } indexURL := "/pied_piper" - _, _, err = httpPutJSON(host, indexURL, leaderIndex) + _, resp, err := httpPutJSON(host, indexURL, leaderIndex) + defer resp.Body.Close() return err } func createCCRFollowerIndex(host string) error { - followerIndex, err := ioutil.ReadFile("ccr/_meta/test/test_follower_index.json") + followerIndex, err := os.ReadFile("ccr/_meta/test/test_follower_index.json") if err != nil { return err } followURL := "/rats/_ccr/follow" - _, _, err = httpPutJSON(host, followURL, followerIndex) + _, resp, err := httpPutJSON(host, followURL, followerIndex) + defer resp.Body.Close() return err } func checkExists(url string) bool { - resp, err := http.Get(url) + req, err := http.NewRequestWithContext(context.Background(), "GET", url, nil) + if err != nil { return false } - resp.Body.Close() - // Entry exists - if resp.StatusCode == 200 { - return true + client := &http.Client{} + resp, err := client.Do(req) + + if err != nil { + return false } - return false + defer resp.Body.Close() + + // Entry exists + return resp.StatusCode == 200 } func createEnrichStats(host string) error { @@ -465,82 +480,58 @@ func createEnrichStats(host string) error { } func createEnrichSourceIndex(host string) error { - sourceDoc, err := ioutil.ReadFile("enrich/_meta/test/source_doc.json") + sourceDoc, err := os.ReadFile("enrich/_meta/test/source_doc.json") if err != nil { return err } docURL := "/users/_doc/1?refresh=wait_for" - _, _, err = httpPutJSON(host, docURL, sourceDoc) + _, resp, err := httpPutJSON(host, docURL, sourceDoc) + defer resp.Body.Close() return err } func createEnrichPolicy(host string) error { - policy, err := ioutil.ReadFile("enrich/_meta/test/policy.json") + policy, err := os.ReadFile("enrich/_meta/test/policy.json") if err != nil { return err } policyURL := "/_enrich/policy/users-policy" - _, _, err = httpPutJSON(host, policyURL, policy) + _, resp, err := httpPutJSON(host, policyURL, policy) + defer resp.Body.Close() return err } func executeEnrichPolicy(host string) error { executeURL := "/_enrich/policy/users-policy/_execute" - _, _, err := httpPostJSON(host, executeURL, nil) + _, resp, err := httpPostJSON(host, executeURL, nil) + defer resp.Body.Close() return err } func createEnrichIngestPipeline(host string) error { - pipeline, err := ioutil.ReadFile("enrich/_meta/test/ingest_pipeline.json") + pipeline, err := os.ReadFile("enrich/_meta/test/ingest_pipeline.json") if err != nil { return err } pipelineURL := "/_ingest/pipeline/user_lookup" - _, _, err = httpPutJSON(host, pipelineURL, pipeline) + _, resp, err := httpPutJSON(host, pipelineURL, pipeline) + defer resp.Body.Close() return err } func ingestAndEnrichDoc(host string) error { - targetDoc, err := ioutil.ReadFile("enrich/_meta/test/target_doc.json") + targetDoc, err := os.ReadFile("enrich/_meta/test/target_doc.json") if err != nil { return err } docURL := "/my_index/_doc/my_id?pipeline=user_lookup" - _, _, err = httpPutJSON(host, docURL, targetDoc) - return err -} - -func countIndices(elasticsearchHostPort string) (int, error) { - return countCatItems(elasticsearchHostPort, "indices", "&expand_wildcards=open,hidden") -} - -func countShards(elasticsearchHostPort string) (int, error) { - return countCatItems(elasticsearchHostPort, "shards", "") -} - -func countCatItems(elasticsearchHostPort, catObject, extraParams string) (int, error) { - resp, err := http.Get("http://" + elasticsearchHostPort + "/_cat/" + catObject + "?format=json" + extraParams) - if err != nil { - return 0, err - } + _, resp, err := httpPutJSON(host, docURL, targetDoc) defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, err - } - - var data []mapstr.M - err = json.Unmarshal(body, &data) - if err != nil { - return 0, err - } - - return len(data), nil + return err } func checkSkip(t *testing.T, metricset string, ver *version.V) { @@ -560,13 +551,19 @@ func checkSkip(t *testing.T, metricset string, ver *version.V) { } func getElasticsearchVersion(elasticsearchHostPort string) (*version.V, error) { - resp, err := http.Get("http://" + elasticsearchHostPort + "/") + req, err := http.NewRequestWithContext(context.Background(), "GET", "http://"+elasticsearchHostPort+"/", nil) + if err != nil { + return nil, err + } + + client := &http.Client{} + resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } @@ -594,7 +591,7 @@ func httpPostJSON(host, path string, body []byte) ([]byte, *http.Response, error } func httpSendJSON(host, path, method string, body []byte) ([]byte, *http.Response, error) { - req, err := http.NewRequest(method, "http://"+host+path, bytes.NewReader(body)) + req, err := http.NewRequestWithContext(context.Background(), method, "http://"+host+path, bytes.NewReader(body)) if err != nil { return nil, nil, err } @@ -605,14 +602,12 @@ func httpSendJSON(host, path, method string, body []byte) ([]byte, *http.Respons if err != nil { return nil, nil, err } - defer resp.Body.Close() - body, err = ioutil.ReadAll(resp.Body) + responseBody, err := io.ReadAll(resp.Body) if err != nil { return nil, nil, err } - - return body, resp, nil + return responseBody, resp, nil } type checkSuccessFunction func() (bool, error) @@ -636,7 +631,7 @@ func waitForSuccess(f checkSuccessFunction, retryInterval time.Duration, numAtte } func randString(len int) string { - rand.Seed(time.Now().UnixNano()) + rand := rand.New(rand.NewSource(time.Now().UnixNano())) b := make([]byte, len) aIdx := int('a') diff --git a/metricbeat/module/elasticsearch/index/data.go b/metricbeat/module/elasticsearch/index/data.go index 620cddf93a0..d8bec4939d6 100644 --- a/metricbeat/module/elasticsearch/index/data.go +++ b/metricbeat/module/elasticsearch/index/data.go @@ -42,7 +42,6 @@ type Index struct { Index string `json:"index"` Status string `json:"status"` - Hidden bool `json:"hidden"` Shards shardStats `json:"shards"` } @@ -191,23 +190,14 @@ func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch. return fmt.Errorf("failure parsing Indices Stats Elasticsearch API response: %w", err) } - indicesSettings, err := elasticsearch.GetIndicesSettings(httpClient, httpClient.GetURI()) - if err != nil { - return fmt.Errorf("failure retrieving indices settings from Elasticsearch: %w", err) - } - var errs multierror.Errors - for name, idx := range indicesStats.Indices { + for name := range indicesStats.Indices { event := mb.Event{ ModuleFields: mapstr.M{}, } + idx := indicesStats.Indices[name] idx.Index = name - settings, exists := indicesSettings[name] - if exists { - idx.Hidden = settings.Hidden - } - err = addClusterStateFields(&idx, clusterState) if err != nil { errs = append(errs, fmt.Errorf("failure adding cluster state fields: %w", err))