Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: make sure address is removed on …
Browse files Browse the repository at this point in the history
…server closure (#35903)

Previously the server address registry was not updated when the server was cancelled,
leaving invalid state in the registry and preventing reconfiguration of servers.
Ensure that the server address entry is deleted from the pool when the server is
cancelled.

In addition the pool lock was not being released in the case of a failed
TLS consistency check. The change here without fixing that resulted in a
deadlock. This is also fixed.
  • Loading branch information
efd6 authored Jun 27, 2023
1 parent 4b371f9 commit 62979b5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve error reporting and fix IPv6 handling of TCP and UDP metric collection. {pull}35772[35772]
- Fix CEL input JSON marshalling of nested objects. {issue}35763[35763] {pull}35774[35774]
- Fix metric collection in GCPPubSub input. {pull}35773[35773]
- Fix end point deregistration in http_endpoint input. {issue}35899[35899] {pull}35903[35903]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
if ok {
err = checkTLSConsistency(e.addr, s.tls, e.config.TLS)
if err != nil {
p.mu.Unlock()
return err
}

Expand Down Expand Up @@ -160,6 +161,9 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern)
err = s.srv.ListenAndServe()
}
p.mu.Lock()
delete(p.servers, e.addr)
p.mu.Unlock()
s.setErr(err)
s.cancel()
return err
Expand Down
23 changes: 20 additions & 3 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ func TestServerPool(t *testing.T) {
if err != nil {
t.Fatalf("failed to post event #%d: %v", i, err)
}
body := dump(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response status code: %s (%d)\nresp: %s",
resp.Status, resp.StatusCode, dump(resp.Body))
resp.Status, resp.StatusCode, body)
}
}
cancel()
Expand All @@ -265,6 +266,22 @@ func TestServerPool(t *testing.T) {
if !cmp.Equal(got, test.want) {
t.Errorf("unexpected result:\n--- got\n--- want\n%s", cmp.Diff(got, test.want))
}

// Try to re-register the same addresses.
ctx, cancel = newCtx("server_pool_test", test.name)
for _, cfg := range test.cfgs {
cfg := cfg
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub)
if err != nil && err != http.ErrServerClosed && test.wantErr == nil {
t.Errorf("failed to re-register %v: %v", cfg.addr, err)
}
}()
}
cancel()
wg.Wait()
})
}
}
Expand All @@ -290,9 +307,9 @@ func newCtx(log, id string) (_ v2.Context, cancel func()) {
}, cancel
}

func dump(r io.ReadCloser) string {
func dump(r io.ReadCloser) []byte {
defer r.Close()
var buf bytes.Buffer
io.Copy(&buf, r)
return buf.String()
return buf.Bytes()
}

0 comments on commit 62979b5

Please sign in to comment.