Skip to content

Commit

Permalink
Refactoring and small code improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed May 23, 2024
1 parent d5e4f5d commit 5f1e184
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 23 deletions.
9 changes: 8 additions & 1 deletion filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ func Filebeat(inputs beater.PluginFactory, settings instance.Settings) *cmd.Beat
command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.Flags().AddGoFlag(flag.CommandLine.Lookup("ignore-journald-version"))

// main_test.go calls this function before the Journald input is initialised
// to avoid panics, we check whether the flag is defined before calling
// AddGoFlag
if ignoreSystemdFlag := flag.CommandLine.Lookup("ignore-journald-version"); ignoreSystemdFlag != nil {
command.Flags().AddGoFlag(ignoreSystemdFlag)
}

command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager))
command.AddCommand(genGenerateCmd())
return command
Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 26 additions & 17 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ type checkpoint struct {
MonotonicTimestamp uint64
}

// errCannotConnectToDBus is returned when the connection to D-Bus
// cannot be established.
var errCannotConnectToDBus = errors.New("cannot connect to D-Bus")

// LocalSystemJournalID is the ID of the local system journal.
const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL"

const pluginName = "journald"

// ErrSystemdVersionNotSupported is returned by the plugin manager when the
// Systemd version is not supported.
var ErrSystemdVersionNotSupported = errors.New("systemd version must be >= 255")

// ErrCannotGetSystemdVersion is returned by the plugin manager when it is
// not possible to get the Systemd version via D-Bus.
var ErrCannotGetSystemdVersion = errors.New("cannot get systemd version")

// Plugin creates a new journald input plugin for creating a stateful input.
Expand Down Expand Up @@ -359,26 +360,34 @@ func (r *readerAdapter) Next() (reader.Message, error) {
// - 252.16-1.amzn2023.0.2
//
// The function will parse and return the integer before the full stop.
func parseSystemdVersion(output string) (int, error) {
parts := strings.Split(output, ".")
if len(parts) < 2 {
return 0, errors.New("unexpected format for version.")
}

version, err := strconv.Atoi(parts[0])
if err != nil {
return 0, fmt.Errorf("cannot parse Systemd version: %s", err)
func parseSystemdVersion(ver string) (int, error) {
// First try, it's just the version number
version, err := strconv.Atoi(ver)
if err == nil {
return version, nil
}

separators := []string{" ", "."}
// Second try, it's separated by '.' like: 255.6-1-arch
for _, sep := range separators {
parts := strings.Split(ver, sep)
if len(parts) >= 2 {
version, err := strconv.Atoi(parts[0])
if err == nil {
return version, nil
}
}
}

return version, err
return 0, fmt.Errorf("unknown format for Systemd version: '%s'", ver)
}

// getSystemdVersionViaDBus gets the Systemd version from D-Bus
//
// We get the version by reading the property
// `org.freedesktop.systemd1.Manager.Version`. Even though this property is
// is documented as not being part of the official API and having an unstable
// scheme, on our tests it proved to be stable enough.
// documented as not being part of the official API and having an unstable
// scheme, on our tests it proved to be stable enough for this use.
//
// The Systemd D-Bus documentation states:
//
Expand All @@ -391,7 +400,7 @@ func parseSystemdVersion(output string) (int, error) {
func getSystemdVersionViaDBus() (string, error) {
conn, err := dbus.ConnectSessionBus()
if err != nil {
return "", fmt.Errorf("%w: %w", errCannotConnectToDBus, err)
return "", fmt.Errorf("cannot connect to D-Bus: %w", err)
}
defer conn.Close()

Expand Down
18 changes: 15 additions & 3 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ import (
"context"
"flag"
"fmt"
"os"
"path"
"testing"

"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestMain(m *testing.M) {
// We use TestMain because all of our current (May 2024) supported Linux
// distributions ship with a version of Systemd that will cause Filebeat
// input to crash when the journal reached the maximum number of files and
// a new rotation happens. To allow the Journald input to be instantiated
// we need to set a CLI flag and that needs to be done before all tests run.
flag.Parse()
noVersionCheck = true
os.Exit(m.Run())
m.Run()
}

func TestInputFieldsTranslation(t *testing.T) {
Expand Down Expand Up @@ -93,7 +97,7 @@ func TestInputFieldsTranslation(t *testing.T) {
}
}

func TestParseJournaldVersion(t *testing.T) {
func TestParseSystemdVersion(t *testing.T) {
foo := map[string]struct {
data string
expected int
Expand All @@ -110,6 +114,14 @@ func TestParseJournaldVersion(t *testing.T) {
expected: 249,
data: `249.11-0ubuntu3.12`,
},
"Debain 10": {
expected: 241,
data: "241",
},
"Red Hat Enterprise Linux 8": {
expected: 239,
data: "239 (239-78.el8)",
},
}

for name, tc := range foo {
Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

devtools "github.com/elastic/beats/v7/dev-tools/mage"
"github.com/elastic/beats/v7/dev-tools/mage/target/build"
"github.com/elastic/beats/v7/dev-tools/mage/target/unittest"
filebeat "github.com/elastic/beats/v7/filebeat/scripts/mage"

//mage:import
Expand All @@ -33,6 +34,7 @@ import (
func init() {
common.RegisterCheckDeps(Update)
test.RegisterDeps(IntegTest)
unittest.RegisterGoTestDeps(TestJournaldInput)

devtools.BeatDescription = "Filebeat sends log files to Logstash or directly to Elasticsearch."
devtools.BeatLicense = "Elastic License"
Expand Down Expand Up @@ -191,9 +193,9 @@ func PythonIntegTest(ctx context.Context) error {
// TestJournald executes the Journald input tests
// Use TEST_COVERAGE=true to enable code coverage profiling.
// Use RACE_DETECTOR=true to enable the race detector.
func TestJournald(ctx context.Context) error {
func TestJournaldInput(ctx context.Context) error {
utArgs := devtools.DefaultGoTestUnitArgs()
utArgs.Packages = []string{"./input/journald"}
utArgs.Packages = []string{"../../filebeat/input/journald"}
if devtools.Platform.GOOS == "linux" {
utArgs.ExtraFlags = append(utArgs.ExtraFlags, "-tags=withjournald")
}
Expand Down

0 comments on commit 5f1e184

Please sign in to comment.