Skip to content

Commit

Permalink
Shift envoy configuration to use XDS API instead of local files.
Browse files Browse the repository at this point in the history
This is part of work to better allow for the final envoy layer to exist
in a different environment than the rest of the application.
  • Loading branch information
GUI committed Jul 21, 2023
1 parent d5646f5 commit 197e1de
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 157 deletions.
12 changes: 12 additions & 0 deletions config/schema.cue
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ import "path"
"P-384",
"P-521",
]
global_downstream_max_connections: uint16 | *8192

xds_rest_server: {
host: string | *"127.0.0.1"
port: uint16 | *14002
listen: {
host: string | *"127.0.0.1"
port: uint16 | *14002
}
max_body_size: string | *"5m"
refresh_delay: string | *"1s"
}
}

api_server: {
Expand Down
5 changes: 5 additions & 0 deletions config/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ envoy:
port: 13000
admin:
port: 13001
xds_rest_server:
port: 13002
listen:
port: 13002
refresh_delay: 0.01s
api_server:
port: 13010
web:
Expand Down
3 changes: 2 additions & 1 deletion src/api-umbrella/cli/setup.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local etlua_render = require("etlua").render
local find_cmd = require "api-umbrella.utils.find_cmd"
local geoip_download_if_missing_or_old = require("api-umbrella.utils.geoip").download_if_missing_or_old
local invert_table = require "api-umbrella.utils.invert_table"
local json_encode = require "api-umbrella.utils.json_encode"
local mkdir_p = require "api-umbrella.utils.mkdir_p"
local path_exists = require "api-umbrella.utils.path_exists"
local path_join = require "api-umbrella.utils.path_join"
Expand Down Expand Up @@ -196,7 +197,7 @@ local function write_templates()

if template_ext == "etlua" then
local render_ok, render_err
render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config })
render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode })
if not render_ok or render_err then
print("template compile error in " .. template_path ..": " .. (render_err or content))
os.exit(1)
Expand Down
175 changes: 161 additions & 14 deletions src/api-umbrella/http-api/state.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,188 @@ local config = require("api-umbrella.utils.load_config")()
local http = require "resty.http"
local json_decode = require("cjson").decode
local json_encode = require "api-umbrella.utils.json_encode"
local compress_json_encode = require("api-umbrella.utils.compressed_json").compress_json_encode
local json_null_default = require "api-umbrella.web-app.utils.json_null_default"
local xpcall_error_handler = require "api-umbrella.utils.xpcall_error_handler"

local cache = active_config_store.cache
local get_active_config = active_config_store.get
local jobs_dict = ngx.shared.jobs
local refresh_local_active_config_cache = active_config_store.refresh_local_cache

-- Refresh cache per request if background polling is disabled.
local state_ttl = 1
if config["router"]["active_config"]["refresh_local_cache_interval"] == 0 then
refresh_local_active_config_cache()

-- If background polling is disabled, then this probably indicates the test
-- environment, so also also don't cache the state for very long.
state_ttl = 0.001
end

local active_config = get_active_config()

local function fetch_web_app_state()
local httpc = http.new()
httpc:set_timeout(2000)

local connect_ok, connect_err = httpc:connect({
scheme = "http",
host = config["web"]["host"],
port = config["web"]["port"],
})

if not connect_ok then
httpc:close()
return nil, "failed to connect: ".. (connect_err or "")
end

local res, err = httpc:request({
method = "GET",
path = "/_web-app-state",
})
if err then
httpc:close()
return nil, "request error: " .. (err or "")
elseif res.status ~= 200 then
httpc:close()
return nil, "unsuccessful response: " .. (res.status or "")
end

local body, body_err = res:read_body()
if body_err then
httpc:close()
return nil, "read body error: " .. (body_err or "")
end

local ok, data = xpcall(json_decode, xpcall_error_handler, body)
if not ok then
return nil, "failed to parse json: " .. (data or "")
end

local keepalive_ok, keepalive_err = httpc:set_keepalive()
if not keepalive_ok then
httpc:close()
return nil, "keepalive error: " .. (keepalive_err or "")
end

return compress_json_encode(data)
end

local function fetch_envoy_state()
local data = {}
local config_version = active_config["version"]

local httpc = http.new()
httpc:set_timeout(2000)

local connect_ok, connect_err = httpc:connect({
scheme = "http",
host = config["envoy"]["admin"]["host"],
port = config["envoy"]["admin"]["port"],
})

if not connect_ok then
httpc:close()
return nil, "failed to connect: ".. (connect_err or "")
end

local stats_res, stats_err = httpc:request({
method = "GET",
path = "/stats?format=json&filter=\\.version_text$",
})
if stats_err then
httpc:close()
return nil, "envoy admin request error: " .. (stats_err or "")
end

local stats_body, stats_body_err = stats_res:read_body()
if stats_body_err then
httpc:close()
return nil, "envoy admin read body error: " .. (stats_body_err or "")
end

local stats_json_ok, stats = xpcall(json_decode, xpcall_error_handler, stats_body)
if not stats_json_ok then
return nil, "failed to parse json: " .. (stats or "")
end

data["versions_ready"] = false
for _, stat in ipairs(stats["stats"]) do
data[stat["name"]] = stat["value"]
if stat["value"] == config_version then
data["versions_ready"] = true
else
data["versions_ready"] = false
break
end
end

local clusters_res, clusters_err = httpc:request({
method = "GET",
path = "/clusters?format=json",
})
if clusters_err then
httpc:close()
return nil, "request error: " .. (clusters_err or "")
elseif clusters_res.status ~= 200 then
httpc:close()
return nil, "unsuccessful response: " .. (clusters_res.status or "")
end

local clusters_body, clusters_body_err = clusters_res:read_body()
if clusters_body_err then
httpc:close()
return nil, "envoy admin read body error: " .. (clusters_body_err or "")
end

local clusters_json_ok, clusters = xpcall(json_decode, xpcall_error_handler, clusters_body)
if not clusters_json_ok then
return nil, "failed to parse json: " .. (clusters or "")
end

data["clusters_ready"] = false
local initialized_cluster_names = {}
for _, cluster in ipairs(clusters["cluster_statuses"]) do
initialized_cluster_names[cluster["name"]] = true
end
for _, cluster in ipairs(active_config["envoy_xds"]["clusters"]["resources"]) do
if not initialized_cluster_names[cluster["name"]] then
data["clusters_ready"] = false
break
else
data["clusters_ready"] = true
end
end

local keepalive_ok, keepalive_err = httpc:set_keepalive()
if not keepalive_ok then
httpc:close()
return nil, "keepalive error: " .. (keepalive_err or "")
end

return compress_json_encode(data)
end

local web_app, web_app_err = cache:get("state:web_app", { ttl = state_ttl }, fetch_web_app_state)
if web_app_err then
ngx.log(ngx.ERR, "error fetching web app state: ", web_app_err)
end

local envoy, envoy_err = cache:get("state:envoy", { ttl = state_ttl }, fetch_envoy_state)
if envoy_err then
ngx.log(ngx.ERR, "error fetching envoy state: ", envoy_err)
end

local response = {
file_config_version = json_null_default(active_config["file_version"]),
db_config_version = json_null_default(active_config["db_version"]),
api_users_last_fetched_version = json_null_default(jobs_dict:get("api_users_store_last_fetched_version")),
distributed_rate_limits_last_pulled_at = json_null_default(jobs_dict:get("rate_limit_counters_store_distributed_last_pulled_at")),
distributed_rate_limits_last_pushed_at = json_null_default(jobs_dict:get("rate_limit_counters_store_distributed_last_pushed_at")),
web_app = web_app,
envoy = envoy,
}

local httpc = http.new()
httpc:set_timeout(1000)
local res, err = httpc:request_uri("http://127.0.0.1:" .. config["web"]["port"] .. "/_web-app-state")
if err then
ngx.log(ngx.ERR, "failed to fetch web app state: ", err)
elseif res.status == 200 then
local ok, data = xpcall(json_decode, xpcall_error_handler, res.body)
if not ok then
ngx.log(ngx.ERR, "failed to parse web-app-state json: " .. (data or ""))
else
response["web_app"] = data
end
end

ngx.header["Content-Type"] = "application/json"
ngx.say(json_encode(response))
62 changes: 62 additions & 0 deletions src/api-umbrella/http-api/xds.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
local get_active_config = require("api-umbrella.proxy.stores.active_config_store").get
local json_decode = require("cjson").decode
local json_encode = require "api-umbrella.utils.json_encode"

local ngx_exit = ngx.exit
local ngx_header = ngx.header
local ngx_say = ngx.say
local req_get_body_data = ngx.req.get_body_data
local req_read_body = ngx.req.read_body

local function handle_not_modified(config)
req_read_body()
local body = req_get_body_data()
local data = json_decode(body)

if data["version_info"] == config["version_info"] then
return ngx_exit(ngx.HTTP_NOT_MODIFIED)
end
end

local function handle_config(config_name)
local active_config = get_active_config()
local config = active_config["envoy_xds"][config_name]
handle_not_modified(config)

ngx_header["Content-Type"] = "application/json"
ngx_say(json_encode(config))
return ngx_exit(ngx.HTTP_OK)
end

local function clusters()
return handle_config("clusters")
end

local function listeners()
return handle_config("listeners")
end

local function routes()
return handle_config("routes")
end

local function require_method(required_method)
local method = ngx.req.get_method()
if method ~= required_method then
return ngx_exit(ngx.HTTP_NOT_ALLOWED)
end
end

local request_uri = ngx.var.request_uri
if request_uri == "/v3/discovery:clusters" then
require_method("POST")
clusters()
elseif request_uri == "/v3/discovery:listeners" then
require_method("POST")
listeners()
elseif request_uri == "/v3/discovery:routes" then
require_method("POST")
routes()
else
ngx_exit(ngx.HTTP_NOT_FOUND)
end
1 change: 0 additions & 1 deletion src/api-umbrella/proxy/hooks/init_preload_modules.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ require "api-umbrella.utils.active_config_store.build_active_config"
require "api-umbrella.utils.active_config_store.cache_computed_api_backend_settings"
require "api-umbrella.utils.active_config_store.fetch_published_config_for_setting_active_config"
require "api-umbrella.utils.active_config_store.polling_set_active_config"
require "api-umbrella.utils.active_config_store.set_envoy_config"
require "api-umbrella.utils.api_key_prefixer"
require "api-umbrella.utils.append_array"
require "api-umbrella.utils.compressed_json"
Expand Down
8 changes: 2 additions & 6 deletions src/api-umbrella/proxy/stores/active_config_store.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ local compressed_json = require "api-umbrella.utils.compressed_json"
local fetch_published_config_for_setting_active_config = require "api-umbrella.utils.active_config_store.fetch_published_config_for_setting_active_config"
local mlcache = require "resty.mlcache"
local polling_set_active_config = require "api-umbrella.utils.active_config_store.polling_set_active_config"
local set_envoy_config = require "api-umbrella.utils.active_config_store.set_envoy_config"

local compress_json_encode = compressed_json.compress_json_encode
local decompress_json_decode = compressed_json.decompress_json_decode
Expand All @@ -24,11 +23,6 @@ local function fetch_compressed_active_config(last_fetched_version)
-- identical strings.
local compressed_active_config = compress_json_encode(active_config)

local _, envoy_err = set_envoy_config(active_config)
if envoy_err then
ngx.log(ngx.ERR, "set envoy error: ", envoy_err)
end

return compressed_active_config, active_config["db_version"]
end)
end
Expand All @@ -51,6 +45,8 @@ if not cache then
return nil
end

_M.cache = cache

function _M.get()
local active_config, err = cache:get("active_config", nil, fetch_compressed_active_config)
if err then
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local build_combined_config = require("api-umbrella.utils.active_config_store.build_combined_config")
local build_envoy_xds_config = require("api-umbrella.utils.active_config_store.build_envoy_xds_config")
local parse_api_backends = require("api-umbrella.utils.active_config_store.parse_api_backends")
local parse_website_backends = require("api-umbrella.utils.active_config_store.parse_website_backends")

Expand All @@ -16,5 +17,7 @@ return function(published_config)
parse_api_backends(active_config["api_backends"])
parse_website_backends(active_config["website_backends"])

active_config["envoy_xds"] = build_envoy_xds_config(active_config)

return active_config
end
Loading

0 comments on commit 197e1de

Please sign in to comment.