Skip to content

Commit

Permalink
Merge pull request #1669 from Logflare/fix/cross-join-unnest-in-sandb…
Browse files Browse the repository at this point in the history
…ox-translation

feat: add cross-join unnest handling in cte query translation
  • Loading branch information
Ziinc authored Aug 5, 2023
2 parents da46c5c + 896a65f commit ca6007b
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 12 deletions.
87 changes: 75 additions & 12 deletions lib/logflare/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -933,10 +933,15 @@ defmodule Logflare.Sql do
defp convert_keys_to_json_query(identifiers, data, base \\ "body")

# convert body.timestamp from unix microsecond to postgres timestamp
defp convert_keys_to_json_query(%{"CompoundIdentifier" => [%{"value" => "timestamp"}]}, _data, [
table,
"body"
]) do
defp convert_keys_to_json_query(
%{"CompoundIdentifier" => [%{"value" => "timestamp"}]},
%{in_cte_tables_tree: in_cte_tables_tree, cte_aliases: cte_aliases} = _data,
[
table,
"body"
]
)
when cte_aliases == %{} or in_cte_tables_tree == true do
%{
"Nested" => %{
"AtTimeZone" => %{
Expand Down Expand Up @@ -1068,12 +1073,33 @@ defmodule Logflare.Sql do
}
end

defp convert_keys_to_json_query(
%{"CompoundIdentifier" => [%{"value" => _join_alias}, %{"value" => key} | _]},
data,
{base, arr_path}
) do
str_path = Enum.join(arr_path, ",")
path = "{#{str_path},#{key}}"

%{
"Nested" => %{
"JsonAccess" => %{
"left" => %{"Identifier" => %{"quote_style" => nil, "value" => base}},
"operator" =>
if(data.in_cast or data.in_binaryop, do: "HashLongArrow", else: "HashArrow"),
"right" => %{"Value" => %{"SingleQuotedString" => path}}
}
}
}
end

defp convert_keys_to_json_query(
%{"CompoundIdentifier" => [%{"value" => join_alias}, %{"value" => key} | _]},
data,
base
) do
path = "{#{data.alias_path_mappings[join_alias]},#{key}}"
str_path = Enum.join(data.alias_path_mappings[join_alias], ",")
path = "{#{str_path},#{key}}"

%{
"Nested" => %{
Expand Down Expand Up @@ -1129,21 +1155,38 @@ defmodule Logflare.Sql do
get_in(from, ["relation", "Table", "alias", "name", "value"])
end)

for from <- from_list,
for from <- from_list do
Enum.reduce(from["joins"] || [], %{}, fn
%{
"relation" => %{
"UNNEST" => %{
"array_expr" => %{"Identifier" => %{"value" => identifier_val}},
"alias" => %{"name" => %{"value" => alias_name}}
}
}
},
acc ->
Map.put(acc, alias_name, [identifier_val])

%{
"relation" => %{
"UNNEST" => %{
"array_expr" => %{"CompoundIdentifier" => identifiers},
"alias" => %{"name" => %{"value" => alias_name}}
}
}
} <- from["joins"] || [],
into: %{} do
arr_path = for i <- identifiers, value = i["value"], value not in table_aliases, do: value
},
acc ->
arr_path =
for i <- identifiers, value = i["value"], value not in table_aliases do
if is_map_key(acc, value), do: acc[value], else: [value]
end
|> List.flatten()

str_path = Enum.join(arr_path, ",")
{alias_name, str_path}
Map.put(acc, alias_name, arr_path)
end)
end
|> Enum.reduce(%{}, fn mappings, acc -> Map.merge(acc, mappings) end)
end

defp traverse_convert_identifiers({"BinaryOp" = k, v}, data) do
Expand Down Expand Up @@ -1175,10 +1218,13 @@ defmodule Logflare.Sql do
value_map["value"]
end

alias_path_mappings = get_bq_alias_path_mappings(%{"Query" => v})

data =
Map.merge(data, %{
from_table_aliases: aliases,
from_table_values: values
from_table_values: values,
alias_path_mappings: alias_path_mappings
})

{k, traverse_convert_identifiers(v, data)}
Expand Down Expand Up @@ -1246,6 +1292,23 @@ defmodule Logflare.Sql do
data
) do
cond do
is_map_key(data.alias_path_mappings, head_val) and
length(data.alias_path_mappings[head_val || []]) > 1 ->
# referencing a cross join unnest
# pop first path part and use it as the base
# with a cross join unnest(metadata) as m
# with a cross join unnest(m.request) as request
# reference of request.status_code gets converted to:
# metadata -> 'request, status_code'
# base is set to the first item of the path (full json path is metadata.request.status_code)

# pop the first
[base | arr_path] = data.alias_path_mappings[head_val]

convert_keys_to_json_query(%{k => v}, data, {base, arr_path})
|> Map.to_list()
|> List.first()

# first OR condition: outside of cte and non-cte
# second OR condition: inside a cte
head_val in data.from_table_aliases or
Expand Down
37 changes: 37 additions & 0 deletions test/logflare/sql_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,43 @@ defmodule Logflare.SqlTest do
assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query)
end

test "CTE translation with cross join" do
bq_query = ~s"""
with edge_logs as (
select t.timestamp, t.id, t.event_message, t.metadata
from `cloudflare.logs.prod` t
cross join unnest(metadata) as m
)
select id, timestamp, event_message, request.method, request.path, response.status_code
from edge_logs
cross join unnest(metadata) as m
cross join unnest(m.request) as request
cross join unnest(m.response) as response
"""

pg_query = ~s"""
with edge_logs as (
select
(to_timestamp( (t.body ->> 'timestamp')::bigint / 1000000.0) AT TIME ZONE 'UTC') as timestamp,
(t.body -> 'id') as id,
(t.body -> 'event_message') AS event_message,
(t.body -> 'metadata') as metadata
from "cloudflare.logs.prod" t
)
SELECT
id AS id,
timestamp AS timestamp,
event_message AS event_message,
( metadata #> '{request,method}') AS method,
( metadata #> '{request,path}') AS path,
( metadata #> '{response,status_code}') AS status_code
FROM edge_logs
"""

{:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query)
assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query)
end

# functions metrics
# test "APPROX_QUANTILES is translated"
# tes "offset() and indexing is translated"
Expand Down
36 changes: 36 additions & 0 deletions test/logflare_web/controllers/endpoints_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,42 @@ defmodule LogflareWeb.EndpointsControllerTest do

assert [%{"event_message" => "some message"}] = json_response(conn, 200)["result"]
assert conn.halted == false

# test a logs ui query
params = %{
iso_timestamp_start:
DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(),
project: "default",
project_tier: "ENTERPRISE",
sql:
"select id, timestamp, event_message, request.method, request.path, response.status_code from edge_logs cross join unnest(metadata) as m cross join unnest(m.request) as request cross join unnest(m.response) as response limit 100 "
}

conn =
initial_conn
|> put_req_header("x-api-key", user.api_key)
|> get(~p"/endpoints/query/logs.all?#{params}")

assert [%{"event_message" => "some message"}] = json_response(conn, 200)["result"]
assert conn.halted == false

# different project filter
params = %{
iso_timestamp_start:
DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(),
project: "other",
project_tier: "ENTERPRISE",
sql:
"select id, timestamp, event_message, request.method, request.path, response.status_code from edge_logs cross join unnest(metadata) as m cross join unnest(m.request) as request cross join unnest(m.response) as response limit 100 "
}

conn =
initial_conn
|> put_req_header("x-api-key", user.api_key)
|> get(~p"/endpoints/query/logs.all?#{params}")

assert [] = json_response(conn, 200)["result"]
assert conn.halted == false
end
end
end

0 comments on commit ca6007b

Please sign in to comment.