From bcde30f95ca0904e5da55f30e19bde9df0286cbf Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 22 Jun 2023 15:37:13 +0000 Subject: [PATCH] Fix double fetch (#35843) (#35889) Fixes #35646 by only unpacking project monitors once. This fixes the ever growing temp folder issue and is more efficient to boot. Previously we would call fetch on a monitor source every time it was run, but only cleanup the fetched resource once, when the monitor was unloaded. We now fetch once and cleanup once. This project also fixes the very confusing issue of two files browser/project.go and browser/source/project.go, we have renamed browser/project.go to browser/sourcejob.go which makes reasoning about this change simpler. (cherry picked from commit f7111dc0aa00b5f018955c0aec5040e2ef9b6e32) Co-authored-by: Andrew Cholakian --- CHANGELOG.next.asciidoc | 1 + x-pack/heartbeat/monitors/browser/browser.go | 2 +- .../monitors/browser/source/project.go | 18 ++++ .../monitors/browser/source/project_test.go | 21 ++++ .../browser/{project.go => sourcejob.go} | 95 +++++++++---------- .../{project_test.go => sourcejob_test.go} | 20 ++-- 6 files changed, 97 insertions(+), 60 deletions(-) rename x-pack/heartbeat/monitors/browser/{project.go => sourcejob.go} (52%) rename x-pack/heartbeat/monitors/browser/{project_test.go => sourcejob_test.go} (93%) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 49ec1cbf73b..2d416e2036b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -161,6 +161,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Fix formatting issue with socket trace timeout. {pull}35434[35434] - Update gval version. {pull}35636[35636] - Fix serialization of processors when running diagnostics. {pull}35698[35698] +- Fix temp dir running out of space with project monitors. {issue}35843[35843] *Heartbeat* diff --git a/x-pack/heartbeat/monitors/browser/browser.go b/x-pack/heartbeat/monitors/browser/browser.go index b09d03eea8a..bd269573643 100644 --- a/x-pack/heartbeat/monitors/browser/browser.go +++ b/x-pack/heartbeat/monitors/browser/browser.go @@ -34,7 +34,7 @@ func create(name string, cfg *config.C) (p plugin.Plugin, err error) { return plugin.Plugin{}, fmt.Errorf("script monitors cannot be run as root") } - s, err := NewProject(cfg) + s, err := NewSourceJob(cfg) if err != nil { return plugin.Plugin{}, err } diff --git a/x-pack/heartbeat/monitors/browser/source/project.go b/x-pack/heartbeat/monitors/browser/source/project.go index f97e69d6709..7caf3edcc2e 100644 --- a/x-pack/heartbeat/monitors/browser/source/project.go +++ b/x-pack/heartbeat/monitors/browser/source/project.go @@ -17,6 +17,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "syscall" "github.com/elastic/elastic-agent-libs/logp" @@ -26,6 +27,8 @@ import ( type ProjectSource struct { Content string `config:"content" json:"content"` TargetDirectory string + fetched bool + mtx sync.Mutex } var ErrNoContent = fmt.Errorf("no 'content' value specified for project monitor source") @@ -39,6 +42,14 @@ func (p *ProjectSource) Validate() error { } func (p *ProjectSource) Fetch() error { + // We only need to unzip the source exactly once + p.mtx.Lock() + defer p.mtx.Unlock() + if p.fetched { + logp.L().Debugf("browser project: re-use already unpacked source: %s", p.Workdir()) + return nil + } + decodedBytes, err := base64.StdEncoding.DecodeString(p.Content) if err != nil { return err @@ -60,6 +71,9 @@ func (p *ProjectSource) Fetch() error { if err != nil { return fmt.Errorf("could not make temp dir for unzipping project source: %w", err) } + + logp.L().Debugf("browser project: unpack source: %s", p.Workdir()) + err = os.Chmod(p.TargetDirectory, defaultMod) if err != nil { return fmt.Errorf("failed assigning default mode %s to temp dir: %w", defaultMod, err) @@ -81,6 +95,8 @@ func (p *ProjectSource) Fetch() error { } } + // We've succeeded, mark the fetch as a success + p.fetched = true return nil } @@ -142,6 +158,8 @@ func (p *ProjectSource) Workdir() string { } func (p *ProjectSource) Close() error { + logp.L().Debugf("browser project: close project source: %s", p.Workdir()) + if p.TargetDirectory != "" { return os.RemoveAll(p.TargetDirectory) } diff --git a/x-pack/heartbeat/monitors/browser/source/project_test.go b/x-pack/heartbeat/monitors/browser/source/project_test.go index f30012670b9..09dda4b5146 100644 --- a/x-pack/heartbeat/monitors/browser/source/project_test.go +++ b/x-pack/heartbeat/monitors/browser/source/project_test.go @@ -50,11 +50,29 @@ func TestProjectSource(t *testing.T) { return } require.NoError(t, err) + fetchAndValidate(t, psrc) }) } } +func TestFetchCaching(t *testing.T) { + cfg := mapstr.M{ + "content": "UEsDBBQACAAIAJ27qVQAAAAAAAAAAAAAAAAiAAAAZXhhbXBsZXMvdG9kb3MvYWR2YW5jZWQuam91cm5leS50c5VRPW/CMBDd+RWnLA0Sigt0KqJqpbZTN+iEGKzkIC6JbfkuiBTx3+uEEAGlgi7Rnf38viIESCLkR/FJ6Eis1VIjpanATBKrWFCpOUU/kcCNzG2GJNgkhoRM1lLHmERfpnAay4ipo3JrHMMWmjPYwcKZHILn33zBqIV3ADIjkxdrJ4y251eZJFNJq3b1Hh1XJx+KeKK+8XATpxiv3o07RidI7Ex5OOocTEQixcz6mF66MRgGXkmxMhqkTiA2VcJ6NQsgpZcZAnueoAfhFqxcYs9/ncwJdl0YP9XeY6OJgb3qFDcMYwhejb5jsAUDyYxBaSi9HmCJlfZJ2vCYNCpc1h2d5m8AB/r99cU+GmS/hpwXc4nmrKh/K917yK57VqZe1lU6zM26WvIiY2WbHunWIiusb3IWVBP0/bP9NGinYTC/qcqWLloY9ybjNAy5VbzYdP1sdz3+8FqJleqsP7/ONPjjp++TPgS3eaks/wBQSwcIVYIEHGwBAADRAwAAUEsDBBQACAAIAJ27qVQAAAAAAAAAAAAAAAAZAAAAZXhhbXBsZXMvdG9kb3MvaGVscGVycy50c5VUTYvbMBC9768YRGAVyKb0uktCu9CeektvpRCtM4nFKpKQxt2kwf+9I9lJ5cRb6MWW5+u9eTOW3nsXCE4QCf0M8OCxImhhG9wexCc0KpKuPsSjpRr5FMXTXeVsJDBObT57v+I8WID0aoczaIKZwmIJpzvIFaUwqrFVDcp7MQPFdSqQlxAA9aY0QUqe7xw5mQo8saflZ3uGUpvNdxVfh1DEliHWmuOyGSan9GrXY4hdSW19Q1yswJ9Ika1zi28P5DZOZCZnjp2Pjh5lhr71+YAxSvHFEgZx20UqGVdoWGAXGFo0Zp5sD0YnOXX+uMi71TY3nTh2PYy0HZCaYMsm0umrC2cYuWYpStwWlksgPNBC9CKJ9UDqGDFQAv7GrFb6N/aqD0hEtl9pX9VYvQLViroR5KZqFXmlVEXmyDNJWS0wkT1aiqPD6fZPynIsEznoYDqdG7Q7qqcs2DPKzOVG7EyHhSj25n0Zyw62PJvcwH2vzz1PN3czSrifwHlaZfUbThuMFNzxPyj1GVeE/rHWRr2guaz1e6wu0foSmhPTL3DwiuqFshVDu/D4aPSPjz/FIK1n9dwQOfu3gk7pL9k4jK+M5lk0LBRy9CB7nn2yD+cStfuFQQ5+riK9kJQ3JV9cbCmuh1n6HF3h5LleimS7GkoynWVL5+KWS6h/AFBLBwgvDHpj+wEAAC8FAABQSwECLQMUAAgACACdu6lUVYIEHGwBAADRAwAAIgAAAAAAAAAAACAApIEAAAAAZXhhbXBsZXMvdG9kb3MvYWR2YW5jZWQuam91cm5leS50c1BLAQItAxQACAAIAJ27qVQvDHpj+wEAAC8FAAAZAAAAAAAAAAAAIACkgbwBAABleGFtcGxlcy90b2Rvcy9oZWxwZXJzLnRzUEsFBgAAAAACAAIAlwAAAP4DAAAAAA==", + } + psrc, err := dummyPSource(cfg) + require.NoError(t, err) + defer psrc.Close() + + err = psrc.Fetch() + require.NoError(t, err) + wdir := psrc.Workdir() + err = psrc.Fetch() + require.NoError(t, err) + wdirNext := psrc.Workdir() + require.Equal(t, wdir, wdirNext) +} + func validateFileContents(t *testing.T, dir string) { expected := []string{ "examples/todos/helpers.ts", @@ -73,6 +91,9 @@ func validateFileContents(t *testing.T, dir string) { } func fetchAndValidate(t *testing.T, psrc *ProjectSource) { + defer func() { + _ = psrc.Close() + }() err := psrc.Fetch() require.NoError(t, err) diff --git a/x-pack/heartbeat/monitors/browser/project.go b/x-pack/heartbeat/monitors/browser/sourcejob.go similarity index 52% rename from x-pack/heartbeat/monitors/browser/project.go rename to x-pack/heartbeat/monitors/browser/sourcejob.go index 853b0754bc0..588e1335d1f 100644 --- a/x-pack/heartbeat/monitors/browser/project.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob.go @@ -18,30 +18,27 @@ import ( "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/synthexec" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" ) -type JourneyLister func(ctx context.Context, projectPath string, params mapstr.M) (journeyNames []string, err error) - -type Project struct { +type SourceJob struct { rawCfg *config.C - projectCfg *Config + browserCfg *Config ctx context.Context cancel context.CancelFunc } -func NewProject(rawCfg *config.C) (*Project, error) { - // Global project context to cancel all jobs +func NewSourceJob(rawCfg *config.C) (*SourceJob, error) { + // Global browser context to cancel all jobs // on close ctx, cancel := context.WithCancel(context.Background()) - s := &Project{ + s := &SourceJob{ rawCfg: rawCfg, - projectCfg: DefaultConfig(), + browserCfg: DefaultConfig(), ctx: ctx, cancel: cancel, } - err := rawCfg.Unpack(s.projectCfg) + err := rawCfg.Unpack(s.browserCfg) if err != nil { return nil, ErrBadConfig(err) } @@ -50,31 +47,31 @@ func NewProject(rawCfg *config.C) (*Project, error) { } func ErrBadConfig(err error) error { - return fmt.Errorf("could not parse project config: %w", err) + return fmt.Errorf("could not parse browser config: %w", err) } -func (p *Project) String() string { +func (sj *SourceJob) String() string { panic("implement me") } -func (p *Project) Fetch() error { - return p.projectCfg.Source.Active().Fetch() +func (sj *SourceJob) Fetch() error { + return sj.browserCfg.Source.Active().Fetch() } -func (p *Project) Workdir() string { - return p.projectCfg.Source.Active().Workdir() +func (sj *SourceJob) Workdir() string { + return sj.browserCfg.Source.Active().Workdir() } -func (p *Project) Params() map[string]interface{} { - return p.projectCfg.Params +func (sj *SourceJob) Params() map[string]interface{} { + return sj.browserCfg.Params } -func (p *Project) FilterJourneys() synthexec.FilterJourneyConfig { - return p.projectCfg.FilterJourneys +func (sj *SourceJob) FilterJourneys() synthexec.FilterJourneyConfig { + return sj.browserCfg.FilterJourneys } -func (p *Project) StdFields() stdfields.StdMonitorFields { - sFields, err := stdfields.ConfigToStdMonitorFields(p.rawCfg) +func (sj *SourceJob) StdFields() stdfields.StdMonitorFields { + sFields, err := stdfields.ConfigToStdMonitorFields(sj.rawCfg) // Should be impossible since outer monitor.go should run this same code elsewhere // TODO: Just pass stdfields in to remove second deserialize if err != nil { @@ -83,45 +80,45 @@ func (p *Project) StdFields() stdfields.StdMonitorFields { return sFields } -func (p *Project) Close() error { - if p.projectCfg.Source.ActiveMemo != nil { - p.projectCfg.Source.ActiveMemo.Close() +func (sj *SourceJob) Close() error { + if sj.browserCfg.Source.ActiveMemo != nil { + sj.browserCfg.Source.ActiveMemo.Close() } // Cancel running jobs ctxs - p.cancel() + sj.cancel() return nil } -func (p *Project) extraArgs() []string { - extraArgs := p.projectCfg.SyntheticsArgs - if len(p.projectCfg.PlaywrightOpts) > 0 { - s, err := json.Marshal(p.projectCfg.PlaywrightOpts) +func (sj *SourceJob) extraArgs() []string { + extraArgs := sj.browserCfg.SyntheticsArgs + if len(sj.browserCfg.PlaywrightOpts) > 0 { + s, err := json.Marshal(sj.browserCfg.PlaywrightOpts) if err != nil { // This should never happen, if it was parsed as a config it should be serializable - logp.L().Warn("could not serialize playwright options '%v': %w", p.projectCfg.PlaywrightOpts, err) + logp.L().Warn("could not serialize playwright options '%v': %w", sj.browserCfg.PlaywrightOpts, err) } else { extraArgs = append(extraArgs, "--playwright-options", string(s)) } } - if p.projectCfg.IgnoreHTTPSErrors { + if sj.browserCfg.IgnoreHTTPSErrors { extraArgs = append(extraArgs, "--ignore-https-errors") } - if p.projectCfg.Sandbox { + if sj.browserCfg.Sandbox { extraArgs = append(extraArgs, "--sandbox") } - if p.projectCfg.Screenshots != "" { - extraArgs = append(extraArgs, "--screenshots", p.projectCfg.Screenshots) + if sj.browserCfg.Screenshots != "" { + extraArgs = append(extraArgs, "--screenshots", sj.browserCfg.Screenshots) } - if p.projectCfg.Throttling != nil { - switch t := p.projectCfg.Throttling.(type) { + if sj.browserCfg.Throttling != nil { + switch t := sj.browserCfg.Throttling.(type) { case bool: if !t { extraArgs = append(extraArgs, "--no-throttling") } case string: - extraArgs = append(extraArgs, "--throttling", fmt.Sprintf("%v", p.projectCfg.Throttling)) + extraArgs = append(extraArgs, "--throttling", fmt.Sprintf("%v", sj.browserCfg.Throttling)) case map[string]interface{}: j, err := json.Marshal(t) if err != nil { @@ -135,22 +132,22 @@ func (p *Project) extraArgs() []string { return extraArgs } -func (p *Project) jobs() []jobs.Job { +func (sj *SourceJob) jobs() []jobs.Job { var j jobs.Job - isScript := p.projectCfg.Source.Inline != nil - ctx := context.WithValue(p.ctx, synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second) + isScript := sj.browserCfg.Source.Inline != nil + ctx := context.WithValue(sj.ctx, synthexec.SynthexecTimeout, sj.browserCfg.Timeout+30*time.Second) if isScript { - src := p.projectCfg.Source.Inline.Script - j = synthexec.InlineJourneyJob(ctx, src, p.Params(), p.StdFields(), p.extraArgs()...) + src := sj.browserCfg.Source.Inline.Script + j = synthexec.InlineJourneyJob(ctx, src, sj.Params(), sj.StdFields(), sj.extraArgs()...) } else { j = func(event *beat.Event) ([]jobs.Job, error) { - err := p.Fetch() + err := sj.Fetch() if err != nil { - return nil, fmt.Errorf("could not fetch for project job: %w", err) + return nil, fmt.Errorf("could not fetch for browser source job: %w", err) } - sj, err := synthexec.ProjectJob(ctx, p.Workdir(), p.Params(), p.FilterJourneys(), p.StdFields(), p.extraArgs()...) + sj, err := synthexec.ProjectJob(ctx, sj.Workdir(), sj.Params(), sj.FilterJourneys(), sj.StdFields(), sj.extraArgs()...) if err != nil { return nil, err } @@ -160,10 +157,10 @@ func (p *Project) jobs() []jobs.Job { return []jobs.Job{j} } -func (p *Project) plugin() plugin.Plugin { +func (sj *SourceJob) plugin() plugin.Plugin { return plugin.Plugin{ - Jobs: p.jobs(), - DoClose: p.Close, + Jobs: sj.jobs(), + DoClose: sj.Close, Endpoints: 1, } } diff --git a/x-pack/heartbeat/monitors/browser/project_test.go b/x-pack/heartbeat/monitors/browser/sourcejob_test.go similarity index 93% rename from x-pack/heartbeat/monitors/browser/project_test.go rename to x-pack/heartbeat/monitors/browser/sourcejob_test.go index 7c9ecec2e46..0fe6115bf1b 100644 --- a/x-pack/heartbeat/monitors/browser/project_test.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob_test.go @@ -44,7 +44,7 @@ func TestValidLocal(t *testing.T) { }, "timeout": timeout, }) - _, e := NewProject(cfg) + _, e := NewSourceJob(cfg) require.Error(t, e) } @@ -66,10 +66,10 @@ func TestValidInline(t *testing.T) { }, "timeout": timeout, }) - s, e := NewProject(cfg) + s, e := NewSourceJob(cfg) require.NoError(t, e) require.NotNil(t, s) - require.Equal(t, script, s.projectCfg.Source.Inline.Script) + require.Equal(t, script, s.browserCfg.Source.Inline.Script) require.Equal(t, "", s.Workdir()) require.Equal(t, testParams, s.Params()) @@ -86,7 +86,7 @@ func TestNameRequired(t *testing.T) { }, }, }) - _, e := NewProject(cfg) + _, e := NewSourceJob(cfg) require.Regexp(t, ErrNameRequired, e) } @@ -99,7 +99,7 @@ func TestIDRequired(t *testing.T) { }, }, }) - _, e := NewProject(cfg) + _, e := NewSourceJob(cfg) require.Regexp(t, ErrIdRequired, e) } @@ -107,7 +107,7 @@ func TestEmptySource(t *testing.T) { cfg := conf.MustNewConfigFrom(mapstr.M{ "source": mapstr.M{}, }) - s, e := NewProject(cfg) + s, e := NewSourceJob(cfg) require.Regexp(t, ErrBadConfig(source.ErrInvalidSource), e) require.Nil(t, s) @@ -196,8 +196,8 @@ func TestExtraArgs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := &Project{ - projectCfg: tt.cfg, + s := &SourceJob{ + browserCfg: tt.cfg, } if got := s.extraArgs(); !reflect.DeepEqual(got, tt.want) { t.Errorf("Project.extraArgs() = %v, want %v", got, tt.want) @@ -217,9 +217,9 @@ func TestEmptyTimeout(t *testing.T) { }, }, }) - s, e := NewProject(cfg) + s, e := NewSourceJob(cfg) require.NoError(t, e) require.NotNil(t, s) - require.Equal(t, s.projectCfg.Timeout, defaults.Timeout) + require.Equal(t, s.browserCfg.Timeout, defaults.Timeout) }