Skip to content

Commit

Permalink
Merge pull request #56 from rudderlabs/fix.issues_with_bigquery
Browse files Browse the repository at this point in the history
fix: issues while running project in bigquery
  • Loading branch information
tipped73 authored Aug 6, 2024
2 parents da82005 + 0182e39 commit 186129c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 47 deletions.
4 changes: 2 additions & 2 deletions models/inputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ inputs:
- select: "anonymous_id"
type: anonymous_id
entity: user
- select: "concat(coalesce(anonymous_id, 'null'), coalesce(to_char(context_session_id), 'null'))"
- select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id as {{warehouse.DataType('text')}}), 'null'))"
type: session_id
entity: user
- select: "concat(coalesce(anonymous_id, 'null'), coalesce(to_char(context_session_id), 'null'))"
- select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id as {{warehouse.DataType('text')}}), 'null'))"
type: session_id
entity: session
- name: rsOrderCreated
Expand Down
2 changes: 1 addition & 1 deletion models/localMacros.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ macros:
- number_of_days
value: "
{% if warehouse.DatabaseType() == \"bigquery\" %}
{% if !(end_time|isnil) %} date_diff(date('{{end_time.Format(\"2006-01-02 15:04:05\")}}'), date({{column}}), day) <={{number_of_days}}
{% if !(end_time|isnil) %} date_diff(date('{{end_time.Format(\"2006-01-02 15:04:05\")}}'), date({{column}}), day)
{% else %}
date_diff(CURRENT_DATE(), date({{column}}), day){% endif %} <= {{number_of_days}}
{% else %}
Expand Down
53 changes: 50 additions & 3 deletions models/macros.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,62 @@ macros:
- name: array_agg
inputs:
- column_name
value: "array_agg( distinct {{column_name}})"
value: "
{% if warehouse.DatabaseType() == \"redshift\" %}
listagg( distinct {{column_name}})
{% elif warehouse.DatabaseType() == \"bigquery\" %}
array_agg( distinct {{column_name}} IGNORE NULLS)
{% else %}
array_agg( distinct {{column_name}})
{% endif %}"
- name: array_size
inputs:
- column_name
value: "
{% if warehouse.DatabaseType() == \"bigquery\" %}
array_length( {{column_name}} )
array_length( split({{column_name}}) )
{% elif warehouse.DatabaseType() == \"redshift\" %}
get_array_length( json_parse({{column_name}}) )
{% elif warehouse.DatabaseType() == \"databricks\" %}
size(from_json(get_json_object({{column_name}}, '$'), 'array<string>'))
{% else %}
array_size( parse_json({{column_name}}) )
{% endif %}"
- name: frame_clause
value: "frame_condition = 'rows between unbounded preceding and unbounded following'"
value: "rows between unbounded preceding and unbounded following"
- name: get_array_from_json
inputs:
- column_name
value: "
{% if warehouse.DatabaseType() == \"redshift\" %}
array_flatten(json_parse(nullif({{column_name}}, '')))
{% elif warehouse.DatabaseType() == \"databricks\" %}
flatten(from_json(nullif({{column_name}}, '')))
{% elif warehouse.DatabaseType() == \"bigquery\" %}
parse_json(nullif({{column_name}}, ''))
{% else %}
flatten(parse_json({{column_name}}))
{% endif %}"
- name: get_value_from_array
inputs:
- array
- key
value: "
{% if warehouse.DatabaseType() == \"redshift\" %}
{{array}}.{{key}}
{% elif warehouse.DatabaseType() == \"bigquery\" %}
json_value({{array}}, \"$.{{key}}\")
{% elif warehouse.DatabaseType() == \"databricks\" %}
get_json_object({{array}}, '$[0].{{key}}')
{% else %}
{{array}}.value['{{key}}']
{% endif %}"
- name: median_val
inputs:
- column_name
value: "
{% if warehouse.DatabaseType() == \"bigquery\" %}
APPROX_QUANTILES(CAST({{column_name}} AS {{warehouse.DataType(\"float\")}}), 2)[OFFSET(1)]
{% else %}
median(CAST({{column_name}} AS {{warehouse.DataType(\"float\")}}))
{% endif %}"
49 changes: 32 additions & 17 deletions models/profiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ var_groups:
description: It shows the average units purchased in each transaction. (Total units in each transaction/Total transactions). Includes only those transactions where the total price (from column current_total_price) is greater than zero. So, the feature exclude transactions with 100% off, replacement products etc that may result in the total_price being equal to zero.
- entity_var:
name: avg_transaction_value
select: avg( total_price_usd )
select: avg( CAST(total_price_usd AS {{warehouse.DataType("float")}}) )
from: inputs/rsOrderCreated
description: Total price in each transaction/Total number of transactions.
- entity_var:
Expand All @@ -164,7 +164,7 @@ var_groups:
description: Of all the transactions done by the user, this features contains the highest transaction value.
- entity_var:
name: median_transaction_value
select: median( total_price_usd )
select: "{{median_val( 'total_price_usd' )}}"
from: inputs/rsOrderCreated
description: Median value of total price of all the transactions
- entity_var:
Expand All @@ -180,10 +180,11 @@ var_groups:
order_by:
- case when TOTAL_PRICE_USD is not null then 2 else 1 end desc
- timestamp desc
frame_clause: "{{frame_clause}}"
description: The total value of products that are part of the last transaction.
- entity_var:
name: total_refund
select: sum(total_price_usd )
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}}) )
from: inputs/rsOrderCancelled
where: financial_status in ('paid','refunded','partially_refunded')
description: Total refund for a particular user to date.
Expand All @@ -195,13 +196,13 @@ var_groups:
description: The total number of times an order has been cancelled by a user and has been refunded
- entity_var:
name: total_refund_in_past_1_days
select: sum(total_price_usd)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}}))
from: inputs/rsOrderCancelled
where: "{{macro_datediff_n('timestamp','1')}}"
description: Total refund for a particular user in last 1 day
- entity_var:
name: total_refund_in_past_7_days
select: sum(total_price_usd)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}}))
from: inputs/rsOrderCancelled
where: "{{macro_datediff_n('timestamp','7')}}"
description: Total refund for a particular user in last 1 day
Expand All @@ -216,6 +217,7 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
where: state is not null and state!=''
- entity_var:
name: country
Expand All @@ -224,6 +226,7 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
where: address_country is not null and address_country!=''
- entity_var:
name: first_name
Expand All @@ -232,6 +235,7 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
where: first_name is not null and first_name!=''
- entity_var:
name: last_name
Expand All @@ -240,6 +244,7 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
where: last_name is not null and last_name!=''
- entity_var:
name: user_email
Expand All @@ -248,6 +253,7 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
where: email is not null and email!=''
- entity_var:
name: currency
Expand All @@ -256,6 +262,7 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
where: currency is not null and currency!=''
- entity_var:
name: active_days_in_past_7_days
Expand Down Expand Up @@ -344,24 +351,24 @@ var_groups:
description: Total carts created by the user till date.
- entity_var:
name: total_products_added
select: array_agg(distinct product_id)
select: "{{array_agg('product_id')}}"
from: models/rsCartLineItems
description: Total products added to cart till date. (array with list of all product ids). It includes all purchased products plus current active cart.
- entity_var:
name: products_added_in_past_1_days
select: array_agg(distinct product_id)
select: "{{array_agg('product_id')}}"
from: models/rsCartLineItems
where: "{{macro_datediff_n('timestamp','1')}}"
description: List of products added to cart by the user in last 1 days. (array with list of all product ids). It includes all purchased products plus current active cart.
- entity_var:
name: products_added_in_past_7_days
select: array_agg(distinct product_id)
select: "{{array_agg('product_id')}}"
from: models/rsCartLineItems
where: "{{macro_datediff_n('timestamp','7')}}"
description: List of products added to cart by the user in last 7 days. (array with list of all product ids). It includes all purchased products plus current active cart.
- entity_var:
name: products_added_in_past_365_days
select: array_agg(distinct product_id)
select: "{{array_agg('product_id')}}"
from: models/rsCartLineItems
where: "{{macro_datediff_n('timestamp',365)}}"
description: List of products added to cart by the user in last 365 days. (array with list of all product ids). It includes all purchased products plus current active cart.
Expand All @@ -372,10 +379,11 @@ var_groups:
window:
order_by:
- timestamp desc
frame_clause: "{{frame_clause}}"
is_feature: false
- entity_var:
name: last_cart_value_in_dollars
select: sum(line_price)
select: sum(CAST(line_price AS {{warehouse.DataType("float")}}))
from: models/rsCartLineItems
where: token={{user.Var("latest_cart_id")}}
description: The value of products added in the latest cart.
Expand Down Expand Up @@ -404,35 +412,35 @@ var_groups:
description: Number of transactions in last 365 day
- entity_var:
name: net_amt_spent_in_past_90_days
select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0)
from: models/rsOrderCreatedOrderCancelled
where: "{{macro_datediff_n('timestamp','90')}}"
description: Net amount i.e. sales-refund spent by the user in last 90 days.
- entity_var:
name: net_amt_spent_in_past_365_days
select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0)
from: models/rsOrderCreatedOrderCancelled
where: "{{macro_datediff_n('timestamp','365')}}"
description: Net amount i.e. sales-refund spent by the user in last 365 days.
- entity_var:
name: net_amt_spent_in_past_1_days
select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0)
from: models/rsOrderCreatedOrderCancelled
where: "{{macro_datediff_n('timestamp','1')}}"
description: Net amount i.e. sales-refund spent by the user in last 1 day.
- entity_var:
name: net_amt_spent_in_past
select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0)
from: models/rsOrderCreatedOrderCancelled
description: Net amount i.e. sales-refund spent by the user to date.
- entity_var:
name: gross_amt_spent_in_past
select: sum(total_price_usd)
select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}}))
from: models/rsOrderCreatedOrderCancelled
description: Total value of products purchased till date.
- entity_var:
name: items_purchased_ever
select: array_agg(sku)
select: "{{array_agg('CAST(sku AS {{warehouse.DataType(\"text\")}})')}}"
from: models/rsItemsPurchasedEver
description: The list of unique products bought by the user.
- name: session_vars
Expand All @@ -452,7 +460,12 @@ var_groups:
enable_status: must_have
- entity_var:
name: session_length
select: TIMESTAMPDIFF(SECOND, {{session.Var("session_start_time")}}, {{session.Var("session_end_time")}})
select: |
{% if warehouse.DatabaseType() == 'bigquery' %}
TIMESTAMP_DIFF(CAST({{session.Var("session_end_time")}} AS TIMESTAMP), CAST({{session.Var("session_start_time")}} AS TIMESTAMP), SECOND)
{% else %}
DATEDIFF(SECOND, CAST({{session.Var("session_start_time")}} AS TIMESTAMP), CAST({{session.Var("session_end_time")}} AS TIMESTAMP))
{% endif %}
materialization:
enable_status: must_have
- entity_var:
Expand All @@ -462,6 +475,7 @@ var_groups:
window:
order_by:
- timestamp
frame_clause: "{{frame_clause}}"
from: inputs/rsPages
- entity_var:
name: anonymous_id
Expand All @@ -470,4 +484,5 @@ var_groups:
window:
order_by:
- timestamp
frame_clause: "{{frame_clause}}"
from: inputs/rsPages
57 changes: 33 additions & 24 deletions models/sql_models.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ models:
run_type: discrete
single_sql: |
{% with OrderCreated = this.DeRef("inputs/rsOrderCreated") %}
select t.value['sku'] as SKU,products,token,ANONYMOUS_ID,USER_ID,TIMESTAMP,order_number,cart_token
from (select * from {{OrderCreated}} ), table(flatten(parse_json(products))) t where products is not null
select {{get_value_from_array('product_info', 'sku')}} as SKU,products,token,ANONYMOUS_ID,USER_ID,order_number,cart_token,TIMESTAMP
{% if warehouse.DatabaseType() == "redshift" || warehouse.DatabaseType() == "bigquery" %}
from (select *, {{get_array_from_json('products')}} AS product_info from {{OrderCreated}} ) where products is not null
{% elif warehouse.DatabaseType() == "databricks" %}
from (select *, products AS product_info from {{OrderCreated}} ) where products is not null
{% else %}
from (select * from {{OrderCreated}} ), table({{get_array_from_json('products')}}) product_info where products is not null
{% endif %}
{% endwith %}
contract:
is_optional: false
Expand Down Expand Up @@ -149,34 +155,37 @@ models:
run_type: discrete
single_sql: |
{% with CartUpdate = this.DeRef("inputs/rsCartUpdate") %}
SELECT to_char(t.value['brand']) AS brand,
t.value['discounted_price'] AS discounted_price,
to_char(t.value['gift_card']) AS gift_card,
t.value['grams'] AS grams,
to_char(t.value['id']) AS id,
to_char(t.value['key']) AS KEY,
t.value['line_price'] AS line_price,
t.value['original_line_price'] AS original_line_price,
t.value['original_price'] AS original_price,
t.value['price'] AS price,
to_char(t.value['product_id']) AS product_id,
to_char(t.value['properties']) AS properties,
t.value['quantity'] AS quantity,
to_char(t.value['sku']) AS sku,
to_char(t.value['taxable']) AS taxable,
to_char(t.value['title']) AS title,
t.value['total_discount'] AS total_discount,
to_char(t.value['variant']) AS _VARIANT_,
products,
anonymous_id,timestamp, token
SELECT CAST({{get_value_from_array('product_info', 'brand')}} as {{warehouse.DataType("text")}}) AS brand,
{{get_value_from_array('product_info', 'discounted_price')}} AS discounted_price,
CAST({{get_value_from_array('product_info', 'gift_card')}} as {{warehouse.DataType("text")}}) AS gift_card,
{{get_value_from_array('product_info', 'grams')}} AS grams,
CAST({{get_value_from_array('product_info', 'id')}} as {{warehouse.DataType("text")}}) AS id,
CAST({{get_value_from_array('product_info', 'key')}} as {{warehouse.DataType("text")}}) AS KEY,
{{get_value_from_array('product_info', 'line_price')}} AS line_price,
{{get_value_from_array('product_info', 'original_line_price')}} AS original_line_price,
{{get_value_from_array('product_info', 'original_price')}} AS original_price,
{{get_value_from_array('product_info', 'price')}} AS price,
CAST({{get_value_from_array('product_info', 'product_id')}} as {{warehouse.DataType("text")}}) AS product_id,
CAST({{get_value_from_array('product_info', 'properties')}} as {{warehouse.DataType("text")}}) AS properties,
{{get_value_from_array('product_info', 'quantity')}} AS quantity,
CAST({{get_value_from_array('product_info', 'sku')}} as {{warehouse.DataType("text")}}) AS sku,
CAST({{get_value_from_array('product_info', 'taxable')}} as {{warehouse.DataType("text")}}) AS taxable,
CAST({{get_value_from_array('product_info', 'title')}} as {{warehouse.DataType("text")}}) AS title,
{{get_value_from_array('product_info', 'total_discount')}} AS total_discount,
CAST({{get_value_from_array('product_info', 'variant')}} as {{warehouse.DataType("text")}}) AS _VARIANT_, products, anonymous_id, token, timestamp
FROM
(SELECT *
FROM
(SELECT *,
(SELECT *, {% if warehouse.DatabaseType() == "databricks" %} products as product_info, {% endif %}
row_number() over(PARTITION BY anonymous_id, token
ORDER BY timestamp DESC) AS rn
FROM {{CartUpdate}} where products is not null)
WHERE rn = 1), table(flatten(parse_json(products))) t
WHERE rn = 1)
{% if warehouse.DatabaseType() == "redshift" || warehouse.DatabaseType() == "bigquery" %}
,(select {{get_array_from_json('products')}} AS product_info FROM {{CartUpdate}})
{% elif warehouse.DatabaseType() == "snowflake" %}
,table({{get_array_from_json('products')}}) product_info
{% endif %}
{% endwith %}
ids:
- select: "anonymous_id"
Expand Down

0 comments on commit 186129c

Please sign in to comment.