From 2095808ebcdbcfe0b50e48e69f071e9945639c0b Mon Sep 17 00:00:00 2001 From: Nick Muerdter <12112+GUI@users.noreply.github.com> Date: Sun, 11 Feb 2024 20:39:27 -0700 Subject: [PATCH] More fluent bit migration work. --- build/package_dependencies.sh | 1 - config/opensearch_templates_v3.json.etlua | 9 +- config/schema.cue | 11 ++ config/test.yml | 7 ++ docker/dev/docker-start | 6 +- src/api-umbrella/cli/write_config_files.lua | 2 +- .../proxy/hooks/init_preload_modules.lua | 2 +- src/api-umbrella/proxy/hooks/init_worker.lua | 2 +- .../proxy/opensearch_templates_data.lua | 20 +--- .../{jobs => startup}/opensearch_setup.lua | 32 ++++-- .../web-app/actions/admin/stats.lua | 7 +- .../models/analytics_search_opensearch.lua | 104 +++++++----------- tasks/deps/rsyslog | 30 ----- tasks/outdated.thor | 2 +- .../etc/fluent-bit/fluent-bit.yaml.etlua | 45 +++++--- templates/etc/perp/fluent-bit/rc.env.etlua | 2 - templates/etc/perp/fluent-bit/rc.main.etlua | 4 +- .../etc/trafficserver/records.config.etlua | 2 +- .../{test_rsyslog.rb => test_fluent_bit.rb} | 101 ++++++++++------- test/processes/test_rpaths.rb | 2 +- test/proxy/logging/test_basics.rb | 4 +- .../api_umbrella_test_helpers/process.rb | 24 ---- .../api_umbrella_test_helpers/setup.rb | 9 +- test/support/models/log_item.rb | 45 +++----- 24 files changed, 208 insertions(+), 265 deletions(-) rename src/api-umbrella/proxy/{jobs => startup}/opensearch_setup.lua (70%) delete mode 100755 tasks/deps/rsyslog rename test/processes/{test_rsyslog.rb => test_fluent_bit.rb} (54%) diff --git a/build/package_dependencies.sh b/build/package_dependencies.sh index ce4c8bb6f..3d37b6e58 100644 --- a/build/package_dependencies.sh +++ b/build/package_dependencies.sh @@ -142,7 +142,6 @@ if [[ "$ID_NORMALIZED" == "rhel" ]]; then ) fi elif [[ "$ID_NORMALIZED" == "debian" ]]; then - libcurl_version=4 libffi_version=8 libldap_version="2.5-0" diff --git a/config/opensearch_templates_v3.json.etlua b/config/opensearch_templates_v3.json.etlua index 29be2bff2..c33768788 100644 --- a/config/opensearch_templates_v3.json.etlua +++ b/config/opensearch_templates_v3.json.etlua @@ -6,9 +6,8 @@ "data_stream": {}, "template": { "settings": { - "index": { - "refresh_interval": "10s" - }, + "index": <%- json_encode(config["opensearch"]["template"]["index"]) %>, + "translog": <%- json_encode(config["opensearch"]["template"]["translog"]) %>, "analysis": { "normalizer": { "lowercase_normalizer": { @@ -20,10 +19,6 @@ "filter": ["uppercase"] } } - }, - "translog": { - "durability": "async", - "sync_interval": "10s" } }, "mappings": { diff --git a/config/schema.cue b/config/schema.cue index 0a12a3775..c324a7c98 100644 --- a/config/schema.cue +++ b/config/schema.cue @@ -466,6 +466,17 @@ import "path" ] index_name_prefix: string | *"api-umbrella" template_version: uint | *3 + template: { + index: { + refresh_interval: string | *"10s" + number_of_shards: uint | *3 + number_of_replicas: uint | *2 + } + translog: { + durability: string | *"async" + sync_interval: string | *"10s" + } + } } #analytics_output_name: "opensearch" diff --git a/config/test.yml b/config/test.yml index 944749683..0d493b90b 100644 --- a/config/test.yml +++ b/config/test.yml @@ -104,6 +104,13 @@ postgresql: password: dev_password opensearch: index_name_prefix: "api-umbrella-test" + template: + index: + # In the test environment, disable replicas, reduce shards, and + # increasing refresh interval to speed things up. + refresh_interval: 50ms + number_of_shards: 1 + number_of_replicas: 0 unbound: port: 13100 control_port: 13101 diff --git a/docker/dev/docker-start b/docker/dev/docker-start index 03c3faf47..25ba09f15 100755 --- a/docker/dev/docker-start +++ b/docker/dev/docker-start @@ -3,11 +3,11 @@ set -e -u -x # Clean files that may be left over if container doesn't shut down cleanly. -rm -f /opt/api-umbrella/var/run/rsyslogd.pid /tmp/.s.PGSQL.* +rm -f /tmp/.s.PGSQL.* -# Clean files that are tailed by rsyslog when using console output to prevent +# Clean files that are tailed by fluent-bit when using console output to prevent # lots of output from previous logs on startup. -rm -f /var/log/api-umbrella/trafficserver/access.log /var/log/api-umbrella/trafficserver/diags.log /var/log/api-umbrella/trafficserver/error.log /var/log/api-umbrella/trafficserver/manager.log /var/log/api-umbrella/trafficserver/traffic.out +rm -f /var/log/api-umbrella/trafficserver/error.log make api-umbrella run diff --git a/src/api-umbrella/cli/write_config_files.lua b/src/api-umbrella/cli/write_config_files.lua index d1be26369..c9d85d3c3 100644 --- a/src/api-umbrella/cli/write_config_files.lua +++ b/src/api-umbrella/cli/write_config_files.lua @@ -126,7 +126,7 @@ local function write_templates() if template_ext == "etlua" then local render_ok, render_err - render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode }) + render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode, path_join = path_join }) if not render_ok or render_err then print("template compile error in " .. template_path ..": " .. (render_err or content)) os.exit(1) diff --git a/src/api-umbrella/proxy/hooks/init_preload_modules.lua b/src/api-umbrella/proxy/hooks/init_preload_modules.lua index ed0d2a45f..1c8cbbcc4 100644 --- a/src/api-umbrella/proxy/hooks/init_preload_modules.lua +++ b/src/api-umbrella/proxy/hooks/init_preload_modules.lua @@ -7,7 +7,6 @@ require "api-umbrella.proxy.jobs.api_users_store_refresh_local_cache" require "api-umbrella.proxy.jobs.db_expirations" require "api-umbrella.proxy.jobs.distributed_rate_limit_puller" require "api-umbrella.proxy.jobs.distributed_rate_limit_pusher" -require "api-umbrella.proxy.jobs.opensearch_setup" require "api-umbrella.proxy.log_utils" require "api-umbrella.proxy.middleware.api_key_validator" require "api-umbrella.proxy.middleware.api_matcher" @@ -24,6 +23,7 @@ require "api-umbrella.proxy.middleware.role_validator" require "api-umbrella.proxy.middleware.user_settings" require "api-umbrella.proxy.middleware.website_matcher" require "api-umbrella.proxy.opensearch_templates_data" +require "api-umbrella.proxy.startup.opensearch_setup" require "api-umbrella.proxy.startup.seed_database" require "api-umbrella.proxy.stores.active_config_store" require "api-umbrella.proxy.stores.api_users_store" diff --git a/src/api-umbrella/proxy/hooks/init_worker.lua b/src/api-umbrella/proxy/hooks/init_worker.lua index d6ba26e55..2ab92f748 100644 --- a/src/api-umbrella/proxy/hooks/init_worker.lua +++ b/src/api-umbrella/proxy/hooks/init_worker.lua @@ -5,7 +5,7 @@ local api_users_store_refresh_local_cache = require "api-umbrella.proxy.jobs.api local db_expirations = require "api-umbrella.proxy.jobs.db_expirations" local distributed_rate_limit_puller = require "api-umbrella.proxy.jobs.distributed_rate_limit_puller" local distributed_rate_limit_pusher = require "api-umbrella.proxy.jobs.distributed_rate_limit_pusher" -local opensearch_setup = require "api-umbrella.proxy.jobs.opensearch_setup" +local opensearch_setup = require "api-umbrella.proxy.startup.opensearch_setup" local random_seed = require "api-umbrella.utils.random_seed" local seed_database = require "api-umbrella.proxy.startup.seed_database" diff --git a/src/api-umbrella/proxy/opensearch_templates_data.lua b/src/api-umbrella/proxy/opensearch_templates_data.lua index 374858565..8ca3bcc74 100644 --- a/src/api-umbrella/proxy/opensearch_templates_data.lua +++ b/src/api-umbrella/proxy/opensearch_templates_data.lua @@ -1,6 +1,7 @@ local config = require("api-umbrella.utils.load_config")() local etlua_render = require("etlua").render local json_decode = require("cjson").decode +local json_encode = require "api-umbrella.utils.json_encode" local xpcall_error_handler = require "api-umbrella.utils.xpcall_error_handler" local opensearch_templates @@ -13,7 +14,7 @@ else local content = f:read("*all") if content then local render_ok, render_err - render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config }) + render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode }) if not render_ok or render_err then ngx.log(ngx.ERR, "template compile error in " .. path ..": " .. (render_err or content)) end @@ -21,23 +22,6 @@ else local ok, data = xpcall(json_decode, xpcall_error_handler, content) if ok then opensearch_templates = data - - -- In the test environment, disable replicas, reduce shards, and - -- increasing refresh interval to speed things up. - if config["app_env"] == "test" then - for _, template in pairs(opensearch_templates) do - if not template["template"]["settings"] then - template["template"]["settings"] = {} - end - if not template["template"]["settings"]["index"] then - template["template"]["settings"]["index"] = {} - end - - template["template"]["settings"]["index"]["refresh_interval"] = "50ms" - template["template"]["settings"]["index"]["number_of_shards"] = 1 - template["template"]["settings"]["index"]["number_of_replicas"] = 0 - end - end else ngx.log(ngx.ERR, "failed to parse json for " .. (path or "") .. ": " .. (data or "")) end diff --git a/src/api-umbrella/proxy/jobs/opensearch_setup.lua b/src/api-umbrella/proxy/startup/opensearch_setup.lua similarity index 70% rename from src/api-umbrella/proxy/jobs/opensearch_setup.lua rename to src/api-umbrella/proxy/startup/opensearch_setup.lua index 9c5acd6cf..28c5af2e5 100644 --- a/src/api-umbrella/proxy/jobs/opensearch_setup.lua +++ b/src/api-umbrella/proxy/startup/opensearch_setup.lua @@ -1,12 +1,12 @@ local interval_lock = require "api-umbrella.utils.interval_lock" local opensearch = require "api-umbrella.utils.opensearch" local opensearch_templates = require "api-umbrella.proxy.opensearch_templates_data" +local shared_dict_retry_set = require("api-umbrella.utils.shared_dict_retry").set -local opensearch_query = opensearch.query local jobs_dict = ngx.shared.jobs +local opensearch_query = opensearch.query local sleep = ngx.sleep - -local delay = 3600 -- in seconds +local timer_at = ngx.timer.at local _M = {} @@ -53,25 +53,35 @@ function _M.create_templates() end end - local set_ok, set_err = jobs_dict:safe_set("opensearch_templates_created", true) + local set_ok, set_err, set_forcible = shared_dict_retry_set(jobs_dict, "opensearch_templates_created", true) if not set_ok then - ngx.log(ngx.ERR, "failed to set 'opensearch_templates_created' in 'active_config' shared dict: ", set_err) + ngx.log(ngx.ERR, "failed to set 'opensearch_templates_created' in 'jobs' shared dict: ", set_err) + elseif set_forcible then + ngx.log(ngx.WARN, "forcibly set 'opensearch_templates_created' in 'jobs' shared dict (shared dict may be too small)") end end local function setup() local _, err = _M.wait_for_opensearch() - if not err then - _M.create_templates() - else - ngx.log(ngx.ERR, "timed out waiting for eleasticsearch before setup, rerunning...") + if err then + ngx.log(ngx.ERR, "timed out waiting for opensearch before setup, rerunning...") sleep(5) - setup() + return setup() end + + _M.create_templates() +end + +function _M.setup_once() + interval_lock.mutex_exec("opensearch_index_setup", setup) end function _M.spawn() - interval_lock.repeat_with_mutex('opensearch_index_setup', delay, setup) + local ok, err = timer_at(0, _M.setup_once) + if not ok then + ngx.log(ngx.ERR, "failed to create timer: ", err) + return + end end return _M diff --git a/src/api-umbrella/web-app/actions/admin/stats.lua b/src/api-umbrella/web-app/actions/admin/stats.lua index dc6698e04..fcebc823d 100644 --- a/src/api-umbrella/web-app/actions/admin/stats.lua +++ b/src/api-umbrella/web-app/actions/admin/stats.lua @@ -370,7 +370,7 @@ function _M.logs(self) for _, hit in ipairs(hits) do local row = hit["_source"] ngx.say(csv.row_to_csv({ - time.opensearch_to_csv(row["request_at"]) or null, + time.opensearch_to_csv(row["@timestamp"]) or null, row["request_method"] or null, row["request_host"] or null, sanitized_full_url(row) or null, @@ -409,7 +409,7 @@ function _M.logs(self) row["api_backend_resolved_host"] or null, row["api_backend_response_code_details"] or null, row["api_backend_response_flags"] or null, - hit["_id"] or null, + hit["request_id"] or null, })) end ngx.flush(true) @@ -431,7 +431,8 @@ function _M.logs(self) row["_type"] = nil row["_score"] = nil row["_index"] = nil - row["request_id"] = hit["_id"] + row["request_at"] = hit["@timestamp"] + row["@timestamp"] = nil row["request_url"] = sanitized_url_path_and_query(row) row["request_url_query"] = strip_api_key_from_query(row["request_url_query"]) if row["request_query"] then diff --git a/src/api-umbrella/web-app/models/analytics_search_opensearch.lua b/src/api-umbrella/web-app/models/analytics_search_opensearch.lua index c467361ee..705d28a90 100644 --- a/src/api-umbrella/web-app/models/analytics_search_opensearch.lua +++ b/src/api-umbrella/web-app/models/analytics_search_opensearch.lua @@ -39,45 +39,32 @@ local UPPERCASE_FIELDS = { local _M = {} _M.__index = _M -local function index_names(start_time, end_time) - assert(start_time) - assert(end_time) - - date_utc:parse(format_iso8601, end_time) - -- TODO: For some reason, set_time_zone_id doesn't work properly if format() - -- isn't called first, when changing between time zones. Need to debug why - -- this isn't working as expected with icu-date, but in the meantime, this - -- workaround seems to make set_time_zone_id work as expected. - -- - -- The following test can reproduce this problem (it will break without this - -- format() call): - -- env OPENSEARCH_TEST_API_VERSION=5 OPENSEARCH_TEST_TEMPLATE_VERSION=2 OPENSEARCH_TEST_INDEX_PARTITION=daily bundle exec minitest test/apis/admin/stats/test_search.rb -n test_bins_results_by_day_with_time_zone_support - date_utc:format(format_iso8601) - date_utc:set_time_zone_id("UTC") - local end_time_millis = date_utc:get_millis() - - date_utc:parse(format_iso8601, start_time) - -- TODO: See above about why this format() call is here, but shouldn't be - -- necessary. - date_utc:format(format_iso8601) - date_utc:set_time_zone_id("UTC") - if config["opensearch"]["index_partition"] == "monthly" then - date_utc:set(icu_date.fields.DAY_OF_MONTH, 1) - end - date_utc:set(icu_date.fields.HOUR_OF_DAY, 0) - date_utc:set(icu_date.fields.MINUTE, 0) - date_utc:set(icu_date.fields.SECOND, 0) - date_utc:set(icu_date.fields.MILLISECOND, 0) - - local names = {} - while date_utc:get_millis() <= end_time_millis do - table.insert(names, config["opensearch"]["index_name_prefix"] .. "-logs-" .. date_utc:format(opensearch.partition_date_format)) - if config["opensearch"]["index_partition"] == "monthly" then - date_utc:add(icu_date.fields.MONTH, 1) - elseif config["opensearch"]["index_partition"] == "daily" then - date_utc:add(icu_date.fields.DATE, 1) +local function index_names(start_time, end_time, body) + local names = { + config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-allowed", + config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-errored", + } + + local exclude_denied = false + local filters = body["query"]["bool"]["filter"]["bool"]["must"] + for _, filter in ipairs(filters) do + if filter["bool"] and filter["bool"]["must"] then + local sub_filters = filter["bool"]["must"] + for _, sub_filter in ipairs(sub_filters) do + if sub_filter["bool"] and sub_filter["bool"]["must_not"] and sub_filter["bool"]["must_not"]["exists"] and sub_filter["bool"]["must_not"]["exists"]["field"] == "gatekeeper_denied_code" then + exclude_denied = true + break + end + end + + if exclude_denied then + break + end end end + if not exclude_denied then + table.insert(names, config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-denied") + end return names end @@ -92,11 +79,6 @@ local function parse_query_builder(query) local field = rule["field"] local value = rule["value"] - local es_field = field - if field == "request_id" then - es_field = "_id" - end - if not CASE_SENSITIVE_FIELDS[field] and type(value) == "string" then if UPPERCASE_FIELDS[field] then value = string.upper(value) @@ -108,31 +90,31 @@ local function parse_query_builder(query) if operator == "equal" or operator == "not_equal" then filter = { term = { - [es_field] = value, + [field] = value, }, } elseif operator == "begins_with" or operator == "not_begins_with" then filter = { prefix = { - [es_field] = value, + [field] = value, }, } elseif operator == "contains" or operator == "not_contains" then filter = { regexp = { - [es_field] = ".*" .. escape_regex(value) .. ".*", + [field] = ".*" .. escape_regex(value) .. ".*", }, } elseif operator == "is_null" or operator == "is_not_null" then filter = { exists = { - field = es_field, + field = field, }, } elseif operator == "less" then filter = { range = { - [es_field] = { + [field] = { lt = tonumber(value), }, }, @@ -140,7 +122,7 @@ local function parse_query_builder(query) elseif operator == "less_or_equal" then filter = { range = { - [es_field] = { + [field] = { lte = tonumber(value), }, }, @@ -148,7 +130,7 @@ local function parse_query_builder(query) elseif operator == "greater" then filter = { range = { - [es_field] = { + [field] = { gt = tonumber(value), }, }, @@ -156,7 +138,7 @@ local function parse_query_builder(query) elseif operator == "greater_or_equal" then filter = { range = { - [es_field] = { + [field] = { gte = tonumber(value), }, }, @@ -164,7 +146,7 @@ local function parse_query_builder(query) elseif operator == "between" then filter = { range = { - [es_field] = { + [field] = { gte = tonumber(value[1]), lte = tonumber(value[2]), }, @@ -226,18 +208,15 @@ function _M.new() }, }, sort = { - { request_at = "desc" }, + { ["@timestamp"] = "desc" }, }, aggregations = {}, size = 0, + track_total_hits = true, timeout = "90s", }, } - if config["opensearch"]["api_version"] >= 7 then - self.body["track_total_hits"] = true - end - return setmetatable(self, _M) end @@ -379,7 +358,7 @@ end function _M:aggregate_by_interval() self.body["aggregations"]["hits_over_time"] = { date_histogram = { - field = "request_at", + field = "@timestamp", interval = self.interval, time_zone = config["analytics"]["timezone"], min_doc_count = 0, @@ -550,7 +529,7 @@ function _M:aggregate_by_drilldown_over_time() aggregations = { drilldown_over_time = { date_histogram = { - field = "request_at", + field = "@timestamp", interval = self.interval, time_zone = config["analytics"]["timezone"], min_doc_count = 0, @@ -572,7 +551,7 @@ function _M:aggregate_by_drilldown_over_time() self.body["aggregations"]["hits_over_time"] = { date_histogram = { - field = "request_at", + field = "@timestamp", interval = self.interval, time_zone = config["analytics"]["timezone"], min_doc_count = 0, @@ -593,7 +572,7 @@ function _M:aggregate_by_user_stats(order) aggregations = { last_request_at = { max = { - field = "request_at", + field = "@timestamp", }, }, }, @@ -637,7 +616,7 @@ end function _M:query_header() local header = deepcopy(self.query) - header["index"] = table.concat(index_names(self.start_time, self.end_time), ",") + header["index"] = table.concat(index_names(self.start_time, self.end_time, self.body), ",") return header end @@ -647,7 +626,7 @@ function _M:query_body() table.insert(body["query"]["bool"]["filter"]["bool"]["must"], { range = { - request_at = { + ["@timestamp"] = { from = assert(self.start_time), to = assert(self.end_time), }, @@ -921,7 +900,6 @@ function _M:cache_interval_results(expires_at) -- Include the version information in the cache key, so that if the -- underlying OpenSearch database is upgraded, new data will be -- fetched. - api_version = config["opensearch"]["api_version"], template_version = config["opensearch"]["template_version"], } table.insert(batch, { diff --git a/tasks/deps/rsyslog b/tasks/deps/rsyslog deleted file mode 100755 index 17acd6d0d..000000000 --- a/tasks/deps/rsyslog +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env bash - -rsyslog_version="8.2312.0" -rsyslog_hash="774032006128a896437f5913e132aa27dbfb937cd8847e449522d5a12d63d03e" - -set -e -u -x -source ./tasks/helpers.sh - -task_working_dir -download "https://www.rsyslog.com/files/download/rsyslog/rsyslog-$rsyslog_version.tar.gz" "sha256" "$rsyslog_hash" -extract_download "rsyslog-$rsyslog_version.tar.gz" - -cd "rsyslog-$rsyslog_version" -./configure \ - --prefix="$INSTALL_PREFIX_EMBEDDED" \ - --disable-liblogging-stdlog \ - --disable-libgcrypt \ - --enable-imptcp \ - --enable-impstats \ - --enable-mmjsonparse \ - --enable-mmutf8fix \ - --enable-elasticsearch \ - --enable-imfile \ - --enable-omstdout -make -j"$NPROC" -make install DESTDIR="$STAGE_DIR" -chrpath -d "$STAGE_EMBEDDED_DIR/sbin/rsyslogd" -find "$STAGE_EMBEDDED_DIR/lib/rsyslog/" -name "*.so" -exec chrpath -d {} \; - -stamp diff --git a/tasks/outdated.thor b/tasks/outdated.thor index 26f511d68..0d70547b1 100644 --- a/tasks/outdated.thor +++ b/tasks/outdated.thor @@ -20,7 +20,7 @@ class Outdated < Thor "envoy_control_plane" => { :git => "https://github.com/GUI/envoy-control-plane.git", }, - "fluent-bit" => { + "fluent_bit" => { :git => "https://github.com/fluent/fluent-bit.git", }, "glauth" => { diff --git a/templates/etc/fluent-bit/fluent-bit.yaml.etlua b/templates/etc/fluent-bit/fluent-bit.yaml.etlua index 6a19f653d..5bbbfa00d 100644 --- a/templates/etc/fluent-bit/fluent-bit.yaml.etlua +++ b/templates/etc/fluent-bit/fluent-bit.yaml.etlua @@ -2,7 +2,7 @@ service: log_level: info # How often to flush inputs to the outputs. flush: <%- json_encode(config["fluent_bit"]["service"]["flush"]) %> - storage.path: /tmp/fluentbit + storage.path: <%- json_encode(path_join(config["db_dir"], "fluent-bit")) %> storage.sync: normal storage.checksum: off # max_chunks_up * 2MB corresponds to roughly the maximum memory use before @@ -14,33 +14,50 @@ service: pipeline: inputs: + - name: fluentbit_metrics + tag: internal_metrics + scrape_interval: 60 + scrape_on_start: true + - name: tcp - tag: all + tag: analytics.allowed listen: <%- json_encode(config["fluent_bit"]["host"]) %> port: <%- json_encode(config["fluent_bit"]["port"]) %> format: json storage.type: filesystem - - name: fluentbit_metrics - tag: internal_metrics - scrape_interval: 60 - scrape_on_start: true + <% if config["log"]["destination"] == "console" then %> + # Workaround for error.log currently being hard-coded to output to a file + # that won't work if symlinked to /dev/stdout (it insists on rolling the file + # since it doesn't think it can write to it). + # + # This workaround should no longer be needed once Trafficserver 10 is + # released and things can be configured to output to stdout/stderr directly: + # https://github.com/apache/trafficserver/pull/7937 + - name: tail + tag: trafficserver + path: <%- json_encode(path_join(config["log_dir"], "trafficserver/error.log")) %> + refresh_interval: 5 + <% end %> filters: - name: rewrite_tag - match: all - rule: "$gatekeeper_denied_code ^. denied false" + match: analytics.allowed + rule: "$gatekeeper_denied_code ^. analytics.denied false" + + - name: rewrite_tag + match: analytics.allowed + rule: "$response_status ^[4-9] analytics.errored false" outputs: # Print what we would log to stdout for extra redundancy. - name: stdout match: "*" - json_date_key: false - format: json_lines + format: msgpack - # Send API logs to OpenSearch analytics DB. + # Send API analytics to OpenSearch analytics DB. - name: opensearch - match_regex: "^(all|denied)$" + match: "analytics.*" host: <%- json_encode(config["opensearch"]["_first_server"]["host"]) %> port: <%- json_encode(config["opensearch"]["_first_server"]["port"]) %> tls: <%- config["opensearch"]["_first_server"]["_https?"] and "on" or "off" %> @@ -52,7 +69,7 @@ pipeline: <% end %> # aws_auth: on # aws_region: us-west-2 - index: <%- json_encode(config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .. "-$TAG") %> + index: <%- json_encode(config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .. "-$TAG[1]") %> # Data streams require "create" operations. write_operation: create # _type field is no longer accepted for OpenSearch. @@ -79,7 +96,7 @@ pipeline: # upload_chunk_size: 30M # total_file_size: 250M # upload_timeout: 60m - # s3_key_format: "/$TAG/%Y/%m/%d/%Y-%m-%dT%H:%M:%SZ-$UUID.gz + # s3_key_format: "/$TAG[1]/%Y/%m/%d/%Y-%m-%dT%H:%M:%SZ-$UUID.gz # send_content_md5: true # auto_retry_requests: true # preserve_data_ordering: true diff --git a/templates/etc/perp/fluent-bit/rc.env.etlua b/templates/etc/perp/fluent-bit/rc.env.etlua index 771cb11cd..04878a04b 100644 --- a/templates/etc/perp/fluent-bit/rc.env.etlua +++ b/templates/etc/perp/fluent-bit/rc.env.etlua @@ -1,5 +1,3 @@ -LD_LIBRARY_PATH=<%- config["_embedded_root_dir"] %>/openresty/luajit/lib:<%- config["_embedded_root_dir"] %>/lib -RSYSLOG_MODDIR=<%- config["_embedded_root_dir"] %>/lib/rsyslog <% if config["http_proxy"] then %> http_proxy=<%- config["http_proxy"] %> <% end %> diff --git a/templates/etc/perp/fluent-bit/rc.main.etlua b/templates/etc/perp/fluent-bit/rc.main.etlua index f7c56ccba..8cad671a2 100755 --- a/templates/etc/perp/fluent-bit/rc.main.etlua +++ b/templates/etc/perp/fluent-bit/rc.main.etlua @@ -16,7 +16,7 @@ if [ "${1}" = "start" ]; then run_args+=("-u" "$api_umbrella_user") fi - dirs=("<%- config['db_dir'] %>/rsyslog") + dirs=("<%- path_join(config['db_dir'], 'fluent-bit') %>") mkdir -p "${dirs[@]}" chmod 750 "${dirs[@]}" if [ -n "$api_umbrella_user" ]; then @@ -29,8 +29,6 @@ if [ "${1}" = "start" ]; then chown "$api_umbrella_user":"$api_umbrella_group" /dev/stdout chown "$api_umbrella_user":"$api_umbrella_group" /dev/stderr fi - - ln -sf /dev/stderr "<%- config['log_dir'] %>/rsyslog/opensearch_error.log" fi exec \ diff --git a/templates/etc/trafficserver/records.config.etlua b/templates/etc/trafficserver/records.config.etlua index 74527cd59..b9d325c19 100644 --- a/templates/etc/trafficserver/records.config.etlua +++ b/templates/etc/trafficserver/records.config.etlua @@ -64,7 +64,7 @@ CONFIG proxy.config.output.logfile.rolling_enabled INT 0 # released and things can be configured to output to stdout/stderr directly: # https://github.com/apache/trafficserver/pull/7937 CONFIG proxy.config.log.auto_delete_rolled_files INT 1 - CONFIG proxy.config.log.max_space_mb_for_logs INT 200 + CONFIG proxy.config.log.max_space_mb_for_logs INT 30 CONFIG proxy.config.log.max_space_mb_headroom INT 20 CONFIG proxy.config.log.rolling_enabled INT 2 diff --git a/test/processes/test_rsyslog.rb b/test/processes/test_fluent_bit.rb similarity index 54% rename from test/processes/test_rsyslog.rb rename to test/processes/test_fluent_bit.rb index 556ee0386..7cb848165 100644 --- a/test/processes/test_rsyslog.rb +++ b/test/processes/test_fluent_bit.rb @@ -1,40 +1,38 @@ require_relative "../test_helper" -class Test::Processes::TestRsyslog < Minitest::Test +class Test::Processes::TestFluentBit < Minitest::Test include ApiUmbrellaTestHelpers::Setup include ApiUmbrellaTestHelpers::Logging def setup super setup_server - @opensearch_error_log_path = File.join($config["log_dir"], "rsyslog/opensearch_error.log") end - # To make sure rsyslog doesn't leak memory while logging requests: - # https://github.com/rsyslog/rsyslog/issues/1668 + # To make sure fluent-bit doesn't leak memory while logging requests def test_memory_leak - # Make some initial requests, to ensure rsyslog is warmed up, which should + log_tail = LogTail.new("fluent-bit/current") + + # Make some initial requests, to ensure fluent-bit is warmed up, which should # stabilize its memory use. make_requests(2_000) warmed_memory_use = memory_use - warmed_error_log_size = opensearch_error_log_size - # Validate that the rsyslog process isn't using more than 100MB initially. + # Validate that the fluent-bit process isn't using more than 60MB initially. # If it is, then this is probably the best indicator that there's a memory # leak and previous tests in the test suite have already leaked a lot of - # memory. Typically rsyslog is more in the 20MB-50MB range. - assert_operator(warmed_memory_use[:rss], :<, 100_000) + # memory. Typically fluent-bit is more in the 10MB-30MB range. + assert_operator(warmed_memory_use[:rss], :<, 60_000) # Make more requests. make_requests(10_000) final_memory_use = memory_use - final_error_log_size = opensearch_error_log_size # Compare the memory use before and after making requests. Note that this # is a very loose test (allowing 15MB of growth), since there can be a # decent amount of fluctuation in normal use. It's hard to determine # whether there's really a leak in such a short test, so the better test is - # actually the initial check to ensure the process is less than 100MB after + # actually the initial check to ensure the process is less than 60MB after # warmup. Assuming this test gets called as part of running the full test # suite, and it's not one of the initial tests (which is randomized, but # won't be at least enough times to catch this at some point), then that's @@ -46,25 +44,33 @@ def test_memory_leak # Ensure nothing was generated in the opensearch error log file, since # the specific leak in v8.28.0 was triggered by generated error data. - assert_equal(warmed_error_log_size, final_error_log_size) + log = log_tail.read + refute_match(/\[\s*error\s*\]/, log) end - def test_opensearch_error_log - FileUtils.rm_f(@opensearch_error_log_path) + def test_opensearch_stdout_logs + log_tail = LogTail.new("fluent-bit/current") + + response = Typhoeus.get("http://127.0.0.1:9080/api/hello/#{unique_test_id}", http_options) + assert_response_code(200, response) + log = log_tail.read_until(unique_test_id, timeout: 30) + assert_match(%r{analytics\.allowed.*"request_path"=>"/api/hello/#{unique_test_id}"}, log) + assert_match(/analytics\.allowed.*"request_id"=>"#{response.headers["X-Api-Umbrella-Request-ID"]}"/, log) + end + + def test_opensearch_error_log make_opensearch_config_invalid do + log_tail = LogTail.new("fluent-bit/current") + response = Typhoeus.get("http://127.0.0.1:9080/api/hello", http_options) assert_response_code(200, response) - Timeout.timeout(30) do - loop do - break if opensearch_error_log_size > 0 - end - end + log = log_tail.read_until(/strict_dynamic_mapping_exception/, timeout: 30) # Expect the failed message in the log file. - log = File.read(@opensearch_error_log_path) - assert_match("strict_dynamic_mapping_exception", log) + assert_match("[error] [output:opensearch:opensearch", log) + assert_match(/strict_dynamic_mapping_exception.*dynamic introduction of \[flb-key\] within/, log) end end @@ -75,7 +81,7 @@ def test_opensearch_error_log_to_console }, }) do make_opensearch_config_invalid do - log_tail = LogTail.new("rsyslog/current") + log_tail = LogTail.new("fluent-bit/current") response = Typhoeus.get("http://127.0.0.1:9080/api/hello", http_options) assert_response_code(200, response) @@ -84,11 +90,32 @@ def test_opensearch_error_log_to_console # actually what would be outputting to the real console if things were # started in this console mode. log = log_tail.read_until(/strict_dynamic_mapping_exception/, timeout: 30) - assert_match("strict_dynamic_mapping_exception", log) + assert_match("[error] [output:opensearch:opensearch", log) + assert_match(/strict_dynamic_mapping_exception.*dynamic introduction of \[flb-key\] within/, log) end end end + # Test the temporary workaround for trafficserver's error.log not being able + # to direct to stderr. Can probably remove once we're on TrafficServer 10+. + def test_trafficserver_error_log_to_console + override_config({ + "log" => { + "destination" => "console", + }, + }) do + log_tail = LogTail.new("fluent-bit/current") + + # Not obvious how to actually trigger content to the error.log, so + # manually append some content to simulate this. + error_content = SecureRandom.uuid + File.write(File.join($config["log_dir"], "trafficserver/error.log"), "#{error_content}\n", mode: "a+") + + log = log_tail.read_until(/#{error_content}/, timeout: 30) + assert_match(/trafficserver:.*{"log"=>"#{error_content}"}/, log) + end + end + private def make_requests(count) @@ -106,7 +133,7 @@ def make_requests(count) end def memory_use - pid = File.read(File.join($config["run_dir"], "rsyslogd.pid")) + pid = api_umbrella_process.perp_pid("fluent-bit") output, status = run_shell("ps", "-o", "vsz=,rss=", pid) assert_equal(0, status, output) @@ -124,33 +151,25 @@ def memory_use memory end - def opensearch_error_log_size - size = 0 - if(File.exist?(@opensearch_error_log_path)) - size = File.size(@opensearch_error_log_path) - end - - size - end - def make_opensearch_config_invalid - # Make a temporary change to the rsyslog configuration to make the output + # Make a temporary change to the fluent-bit configuration to make the output # invalid. - conf_path = File.join($config["etc_dir"], "rsyslog.conf") - content = File.read(conf_path) + conf_path = File.join($config["etc_dir"], "fluent-bit/fluent-bit.yaml") + content = YAML.safe_load_file(conf_path) FileUtils.mv(conf_path, "#{conf_path}.bak") # Trigger opensearch logging errors by using an invalid template. - content.gsub!(/^template\(name="opensearch-json-record".*$/, 'template(name="opensearch-json-record" type="string" string="{\"msgnum\":\"x%msg:F,58:2%\"}")') + opensearch = content.fetch("pipeline").fetch("outputs").detect { |output| output.fetch("name") == "opensearch" } + opensearch["include_tag_key"] = true - # Write the new temp config file and restart rsyslog. - File.write(conf_path, content) - api_umbrella_process.perp_restart("rsyslog") + # Write the new temp config file and restart fluent-bit. + File.write(conf_path, YAML.safe_dump(content)) + api_umbrella_process.perp_restart("fluent-bit") yield ensure # Restore the original config file and restart. FileUtils.mv("#{conf_path}.bak", conf_path) - api_umbrella_process.perp_restart("rsyslog") + api_umbrella_process.perp_restart("fluent-bit") end end diff --git a/test/processes/test_rpaths.rb b/test/processes/test_rpaths.rb index 59d31eba0..0c5392f9d 100644 --- a/test/processes/test_rpaths.rb +++ b/test/processes/test_rpaths.rb @@ -36,7 +36,7 @@ def test_binary_rpaths # expect. assert_operator(bins.length, :>, 0) [ - "/embedded/sbin/rsyslogd", + "/embedded/bin/fluent-bit", "/embedded/openresty/nginx/sbin/nginx", "/embedded/libexec/trafficserver/header_rewrite.so", # LuaRock diff --git a/test/proxy/logging/test_basics.rb b/test/proxy/logging/test_basics.rb index 6ec5fd844..395e92234 100644 --- a/test/proxy/logging/test_basics.rb +++ b/test/proxy/logging/test_basics.rb @@ -452,7 +452,7 @@ def test_truncates_url_query_length_in_logs end # Try to log a long version of all inputs to ensure the overall log message - # doesn't exceed rsyslog's buffer size. + # doesn't exceed fluent-bit's buffer size. def test_long_url_and_request_headers_and_response_headers # Setup a backend to accept wildcard hosts so we can test a long hostname. prepend_api_backends([ @@ -492,7 +492,7 @@ def test_long_url_and_request_headers_and_response_headers assert_equal("long=#{long_value}"[0, 4000], record["request_url_query"]) # Ensure the long header values got truncated so we're not susceptible to - # exceeding rsyslog's message buffers and we're also not storing an + # exceeding fluent-bit's message buffers and we're also not storing an # unexpected amount of data for values users can pass in. assert_equal(200, record["request_accept"].length, record["request_accept"]) assert_equal(200, record["request_accept_encoding"].length, record["request_accept_encoding"]) diff --git a/test/support/api_umbrella_test_helpers/process.rb b/test/support/api_umbrella_test_helpers/process.rb index 83fe20233..ec0b7d63c 100644 --- a/test/support/api_umbrella_test_helpers/process.rb +++ b/test/support/api_umbrella_test_helpers/process.rb @@ -426,21 +426,11 @@ def nginx_wait_for_new_child_pids(parent_pid, expected_num_workers, original_chi def static_test_config unless @static_test_config - opensearch_test_api_version = nil - if ENV["OPENSEARCH_TEST_API_VERSION"] - opensearch_test_api_version = ENV.fetch("OPENSEARCH_TEST_API_VERSION").to_i - end - opensearch_test_template_version = nil if ENV["OPENSEARCH_TEST_TEMPLATE_VERSION"] opensearch_test_template_version = ENV.fetch("OPENSEARCH_TEST_TEMPLATE_VERSION").to_i end - opensearch_test_index_partition = nil - if ENV["OPENSEARCH_TEST_INDEX_PARTITION"] - opensearch_test_index_partition = ENV.fetch("OPENSEARCH_TEST_INDEX_PARTITION") - end - static = YAML.safe_load_file(TEST_CONFIG_PATH) # Create an config file for computed overrides. @@ -456,13 +446,6 @@ def static_test_config static["user"] = "api-umbrella" static["group"] = "api-umbrella" end - if opensearch_test_api_version - static.deep_merge!({ - "opensearch" => { - "api_version" => opensearch_test_api_version, - }, - }) - end if opensearch_test_template_version static.deep_merge!({ "opensearch" => { @@ -470,13 +453,6 @@ def static_test_config }, }) end - if opensearch_test_index_partition - static.deep_merge!({ - "opensearch" => { - "index_partition" => opensearch_test_index_partition, - }, - }) - end @static_test_config = IceNine.deep_freeze(static) end diff --git a/test/support/api_umbrella_test_helpers/setup.rb b/test/support/api_umbrella_test_helpers/setup.rb index 163280b43..8f90defc3 100644 --- a/test/support/api_umbrella_test_helpers/setup.rb +++ b/test/support/api_umbrella_test_helpers/setup.rb @@ -358,7 +358,7 @@ def override_config_set(config, options = {}) # reasons and cleanup and restart extra things, since this is not a # change we would normally expect to happen without a full restart. if previous_override_config.dig("log", "destination") || - @@current_override_config.dig("log", "destination") || + @@current_override_config.dig("log", "destination") # These log files are symlinked to stdout or stderr by the perp init # scripts, so when switching between these log destinations, we need @@ -368,7 +368,6 @@ def override_config_set(config, options = {}) # to unexpected places. FileUtils.rm_f(File.join($config["log_dir"], "nginx-web-app/access.log")) FileUtils.rm_f(File.join($config["log_dir"], "nginx/access.log")) - FileUtils.rm_f(File.join($config["log_dir"], "rsyslog/opensearch_error.log")) FileUtils.rm_f(File.join($config["log_dir"], "trafficserver/access.log")) FileUtils.rm_f(File.join($config["log_dir"], "trafficserver/diags.log")) FileUtils.rm_f(File.join($config["log_dir"], "trafficserver/manager.log")) @@ -382,10 +381,10 @@ def override_config_set(config, options = {}) # think it suffices for testing the differences in the test # environment for now. self.api_umbrella_process.restart_services([ - "trafficserver", - "rsyslog", + "fluent-bit", "nginx", "nginx-web-app", + "trafficserver", ], options) end @@ -415,7 +414,7 @@ def override_config_set(config, options = {}) previous_override_config["https_proxy"] || @@current_override_config["https_proxy"] - self.api_umbrella_process.restart_services(["rsyslog"], options) + self.api_umbrella_process.restart_services(["fluent-bit"], options) end end end diff --git a/test/support/models/log_item.rb b/test/support/models/log_item.rb index 4fd3d0b36..c985099ec 100644 --- a/test/support/models/log_item.rb +++ b/test/support/models/log_item.rb @@ -66,38 +66,15 @@ def self.clean_indices! # While not the most efficient way to bulk delete things, we don't want to # completely drop the index, since that might remove mappings that only get # created on API Umbrella startup. - bulk_request = [] - opts = { + self.client.delete_by_query({ :index => "_all", - :sort => "_doc", - :scroll => "2m", - :size => 1000, + :refresh => true, :body => { :query => { :match_all => {}, }, }, - } - result = self.client.search(opts) - loop do - hits = result["hits"]["hits"] - break if hits.empty? - - hits.each do |hit| - bulk_request << { :delete => { :_index => hit["_index"], :_type => hit["_type"], :_id => hit["_id"] } } - end - - result = self.client.scroll(:scroll_id => result["_scroll_id"], :scroll => "2m") - end - - self.client.clear_scroll(:scroll_id => result["_scroll_id"]) - - # Perform the bulk delete of all records in this index. - unless bulk_request.empty? - self.client.bulk :body => bulk_request - end - - self.refresh_indices! + }) end def serializable_hash @@ -143,18 +120,22 @@ def serializable_hash hash.delete("request_query") end + hash["request_id"] = self._id + hash["@timestamp"] = hash.delete("request_at") + hash end def save - index_time = self.request_at - if(index_time.kind_of?(Integer)) - index_time = Time.at(index_time / 1000.0).utc + prefix = "#{$config.fetch("opensearch").fetch("index_name_prefix")}-logs-v#{$config["opensearch"]["template_version"]}" + if !self.gatekeeper_denied_code.nil? + index_name = "#{prefix}-denied" + elsif self.response_status.to_s =~ /^[4-9]/ + index_name = "#{prefix}-errored" + else + index_name = "#{prefix}-allowed" end - prefix = "#{$config.fetch("opensearch").fetch("index_name_prefix")}-logs" - index_name = "#{prefix}-v#{$config["opensearch"]["template_version"]}-all" - self.client.index({ :index => index_name, :id => self._id,