From 6827dbb25f15cd33cbc1a3695a4c600a12307ff0 Mon Sep 17 00:00:00 2001 From: Yuvraj Angi Date: Tue, 23 Jul 2024 15:45:41 +0530 Subject: [PATCH 1/2] fixed issues faced while running in bigquery --- models/inputs.yaml | 4 ++-- models/localMacros.yaml | 2 +- models/profiles.yaml | 7 ++++++- models/sql_models.yaml | 20 ++++++++++---------- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/models/inputs.yaml b/models/inputs.yaml index 30a771d..d719a46 100644 --- a/models/inputs.yaml +++ b/models/inputs.yaml @@ -74,10 +74,10 @@ inputs: - select: "anonymous_id" type: anonymous_id entity: user - - select: "concat(coalesce(anonymous_id, 'null'), coalesce(to_char(context_session_id), 'null'))" + - select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id AS STRING), 'null'))" type: session_id entity: user - - select: "concat(coalesce(anonymous_id, 'null'), coalesce(to_char(context_session_id), 'null'))" + - select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id AS STRING), 'null'))" type: session_id entity: session - name: rsOrderCreated diff --git a/models/localMacros.yaml b/models/localMacros.yaml index 1d53c0f..0f93154 100644 --- a/models/localMacros.yaml +++ b/models/localMacros.yaml @@ -18,7 +18,7 @@ macros: - number_of_days value: " {% if warehouse.DatabaseType() == \"bigquery\" %} - {% if !(end_time|isnil) %} date_diff(date('{{end_time.Format(\"2006-01-02 15:04:05\")}}'), date({{column}}), day) <={{number_of_days}} + {% if !(end_time|isnil) %} date_diff(date('{{end_time.Format(\"2006-01-02 15:04:05\")}}'), date({{column}}), day) {% else %} date_diff(CURRENT_DATE(), date({{column}}), day){% endif %} <= {{number_of_days}} {% else %} diff --git a/models/profiles.yaml b/models/profiles.yaml index 0c37aad..13c8929 100644 --- a/models/profiles.yaml +++ b/models/profiles.yaml @@ -452,7 +452,12 @@ var_groups: enable_status: must_have - entity_var: name: session_length - select: TIMESTAMPDIFF(SECOND, {{session.Var("session_start_time")}}, {{session.Var("session_end_time")}}) + select: | + {% if warehouse.DatabaseType() == 'bigquery' %} + TIMESTAMP_DIFF({{session.Var("session_end_time")}}, {{session.Var("session_start_time")}}, SECOND) + {% else %} + DATEDIFF(SECOND, {{session.Var("session_start_time")}}, {{session.Var("session_end_time")}}) + {% endif %} materialization: enable_status: must_have - entity_var: diff --git a/models/sql_models.yaml b/models/sql_models.yaml index 0fb7c91..6c3d3d7 100644 --- a/models/sql_models.yaml +++ b/models/sql_models.yaml @@ -149,24 +149,24 @@ models: run_type: discrete single_sql: | {% with CartUpdate = this.DeRef("inputs/rsCartUpdate") %} - SELECT to_char(t.value['brand']) AS brand, + SELECT CAST(t.value['brand'] AS STRING) AS brand, t.value['discounted_price'] AS discounted_price, - to_char(t.value['gift_card']) AS gift_card, + CAST(t.value['gift_card'] AS STRING) AS gift_card, t.value['grams'] AS grams, - to_char(t.value['id']) AS id, - to_char(t.value['key']) AS KEY, + CAST(t.value['id'] AS STRING) AS id, + CAST(t.value['key'] AS STRING) AS KEY, t.value['line_price'] AS line_price, t.value['original_line_price'] AS original_line_price, t.value['original_price'] AS original_price, t.value['price'] AS price, - to_char(t.value['product_id']) AS product_id, - to_char(t.value['properties']) AS properties, + CAST(t.value['product_id'] AS STRING) AS product_id, + CAST(t.value['properties'] AS STRING) AS properties, t.value['quantity'] AS quantity, - to_char(t.value['sku']) AS sku, - to_char(t.value['taxable']) AS taxable, - to_char(t.value['title']) AS title, + CAST(t.value['sku'] AS STRING) AS sku, + CAST(t.value['taxable'] AS STRING) AS taxable, + CAST(t.value['title'] AS STRING) AS title, t.value['total_discount'] AS total_discount, - to_char(t.value['variant']) AS _VARIANT_, + CAST(t.value['variant'] AS STRING) AS _VARIANT_, products, anonymous_id,timestamp, token FROM From 0182e3920c2b5d4349ebcc6a001a03f6692f6b6a Mon Sep 17 00:00:00 2001 From: Yuvraj Angi Date: Mon, 5 Aug 2024 15:36:16 +0530 Subject: [PATCH 2/2] project now working on all warehouses --- models/inputs.yaml | 4 +-- models/macros.yaml | 53 ++++++++++++++++++++++++++++++++++++--- models/profiles.yaml | 46 +++++++++++++++++++++------------- models/sql_models.yaml | 57 ++++++++++++++++++++++++------------------ 4 files changed, 113 insertions(+), 47 deletions(-) diff --git a/models/inputs.yaml b/models/inputs.yaml index d719a46..8f42837 100644 --- a/models/inputs.yaml +++ b/models/inputs.yaml @@ -74,10 +74,10 @@ inputs: - select: "anonymous_id" type: anonymous_id entity: user - - select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id AS STRING), 'null'))" + - select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id as {{warehouse.DataType('text')}}), 'null'))" type: session_id entity: user - - select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id AS STRING), 'null'))" + - select: "concat(coalesce(anonymous_id, 'null'), coalesce(CAST(context_session_id as {{warehouse.DataType('text')}}), 'null'))" type: session_id entity: session - name: rsOrderCreated diff --git a/models/macros.yaml b/models/macros.yaml index 3c6086a..6502297 100644 --- a/models/macros.yaml +++ b/models/macros.yaml @@ -2,15 +2,62 @@ macros: - name: array_agg inputs: - column_name - value: "array_agg( distinct {{column_name}})" + value: " + {% if warehouse.DatabaseType() == \"redshift\" %} + listagg( distinct {{column_name}}) + {% elif warehouse.DatabaseType() == \"bigquery\" %} + array_agg( distinct {{column_name}} IGNORE NULLS) + {% else %} + array_agg( distinct {{column_name}}) + {% endif %}" - name: array_size inputs: - column_name value: " {% if warehouse.DatabaseType() == \"bigquery\" %} - array_length( {{column_name}} ) + array_length( split({{column_name}}) ) + {% elif warehouse.DatabaseType() == \"redshift\" %} + get_array_length( json_parse({{column_name}}) ) + {% elif warehouse.DatabaseType() == \"databricks\" %} + size(from_json(get_json_object({{column_name}}, '$'), 'array')) {% else %} array_size( parse_json({{column_name}}) ) {% endif %}" - name: frame_clause - value: "frame_condition = 'rows between unbounded preceding and unbounded following'" + value: "rows between unbounded preceding and unbounded following" + - name: get_array_from_json + inputs: + - column_name + value: " + {% if warehouse.DatabaseType() == \"redshift\" %} + array_flatten(json_parse(nullif({{column_name}}, ''))) + {% elif warehouse.DatabaseType() == \"databricks\" %} + flatten(from_json(nullif({{column_name}}, ''))) + {% elif warehouse.DatabaseType() == \"bigquery\" %} + parse_json(nullif({{column_name}}, '')) + {% else %} + flatten(parse_json({{column_name}})) + {% endif %}" + - name: get_value_from_array + inputs: + - array + - key + value: " + {% if warehouse.DatabaseType() == \"redshift\" %} + {{array}}.{{key}} + {% elif warehouse.DatabaseType() == \"bigquery\" %} + json_value({{array}}, \"$.{{key}}\") + {% elif warehouse.DatabaseType() == \"databricks\" %} + get_json_object({{array}}, '$[0].{{key}}') + {% else %} + {{array}}.value['{{key}}'] + {% endif %}" + - name: median_val + inputs: + - column_name + value: " + {% if warehouse.DatabaseType() == \"bigquery\" %} + APPROX_QUANTILES(CAST({{column_name}} AS {{warehouse.DataType(\"float\")}}), 2)[OFFSET(1)] + {% else %} + median(CAST({{column_name}} AS {{warehouse.DataType(\"float\")}})) + {% endif %}" diff --git a/models/profiles.yaml b/models/profiles.yaml index 13c8929..4b596aa 100644 --- a/models/profiles.yaml +++ b/models/profiles.yaml @@ -154,7 +154,7 @@ var_groups: description: It shows the average units purchased in each transaction. (Total units in each transaction/Total transactions). Includes only those transactions where the total price (from column current_total_price) is greater than zero. So, the feature exclude transactions with 100% off, replacement products etc that may result in the total_price being equal to zero. - entity_var: name: avg_transaction_value - select: avg( total_price_usd ) + select: avg( CAST(total_price_usd AS {{warehouse.DataType("float")}}) ) from: inputs/rsOrderCreated description: Total price in each transaction/Total number of transactions. - entity_var: @@ -164,7 +164,7 @@ var_groups: description: Of all the transactions done by the user, this features contains the highest transaction value. - entity_var: name: median_transaction_value - select: median( total_price_usd ) + select: "{{median_val( 'total_price_usd' )}}" from: inputs/rsOrderCreated description: Median value of total price of all the transactions - entity_var: @@ -180,10 +180,11 @@ var_groups: order_by: - case when TOTAL_PRICE_USD is not null then 2 else 1 end desc - timestamp desc + frame_clause: "{{frame_clause}}" description: The total value of products that are part of the last transaction. - entity_var: name: total_refund - select: sum(total_price_usd ) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}}) ) from: inputs/rsOrderCancelled where: financial_status in ('paid','refunded','partially_refunded') description: Total refund for a particular user to date. @@ -195,13 +196,13 @@ var_groups: description: The total number of times an order has been cancelled by a user and has been refunded - entity_var: name: total_refund_in_past_1_days - select: sum(total_price_usd) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) from: inputs/rsOrderCancelled where: "{{macro_datediff_n('timestamp','1')}}" description: Total refund for a particular user in last 1 day - entity_var: name: total_refund_in_past_7_days - select: sum(total_price_usd) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) from: inputs/rsOrderCancelled where: "{{macro_datediff_n('timestamp','7')}}" description: Total refund for a particular user in last 1 day @@ -216,6 +217,7 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" where: state is not null and state!='' - entity_var: name: country @@ -224,6 +226,7 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" where: address_country is not null and address_country!='' - entity_var: name: first_name @@ -232,6 +235,7 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" where: first_name is not null and first_name!='' - entity_var: name: last_name @@ -240,6 +244,7 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" where: last_name is not null and last_name!='' - entity_var: name: user_email @@ -248,6 +253,7 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" where: email is not null and email!='' - entity_var: name: currency @@ -256,6 +262,7 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" where: currency is not null and currency!='' - entity_var: name: active_days_in_past_7_days @@ -344,24 +351,24 @@ var_groups: description: Total carts created by the user till date. - entity_var: name: total_products_added - select: array_agg(distinct product_id) + select: "{{array_agg('product_id')}}" from: models/rsCartLineItems description: Total products added to cart till date. (array with list of all product ids). It includes all purchased products plus current active cart. - entity_var: name: products_added_in_past_1_days - select: array_agg(distinct product_id) + select: "{{array_agg('product_id')}}" from: models/rsCartLineItems where: "{{macro_datediff_n('timestamp','1')}}" description: List of products added to cart by the user in last 1 days. (array with list of all product ids). It includes all purchased products plus current active cart. - entity_var: name: products_added_in_past_7_days - select: array_agg(distinct product_id) + select: "{{array_agg('product_id')}}" from: models/rsCartLineItems where: "{{macro_datediff_n('timestamp','7')}}" description: List of products added to cart by the user in last 7 days. (array with list of all product ids). It includes all purchased products plus current active cart. - entity_var: name: products_added_in_past_365_days - select: array_agg(distinct product_id) + select: "{{array_agg('product_id')}}" from: models/rsCartLineItems where: "{{macro_datediff_n('timestamp',365)}}" description: List of products added to cart by the user in last 365 days. (array with list of all product ids). It includes all purchased products plus current active cart. @@ -372,10 +379,11 @@ var_groups: window: order_by: - timestamp desc + frame_clause: "{{frame_clause}}" is_feature: false - entity_var: name: last_cart_value_in_dollars - select: sum(line_price) + select: sum(CAST(line_price AS {{warehouse.DataType("float")}})) from: models/rsCartLineItems where: token={{user.Var("latest_cart_id")}} description: The value of products added in the latest cart. @@ -404,35 +412,35 @@ var_groups: description: Number of transactions in last 365 day - entity_var: name: net_amt_spent_in_past_90_days - select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0) from: models/rsOrderCreatedOrderCancelled where: "{{macro_datediff_n('timestamp','90')}}" description: Net amount i.e. sales-refund spent by the user in last 90 days. - entity_var: name: net_amt_spent_in_past_365_days - select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0) from: models/rsOrderCreatedOrderCancelled where: "{{macro_datediff_n('timestamp','365')}}" description: Net amount i.e. sales-refund spent by the user in last 365 days. - entity_var: name: net_amt_spent_in_past_1_days - select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0) from: models/rsOrderCreatedOrderCancelled where: "{{macro_datediff_n('timestamp','1')}}" description: Net amount i.e. sales-refund spent by the user in last 1 day. - entity_var: name: net_amt_spent_in_past - select: sum(total_price_usd) - coalesce(sum(total_price_usd_order_cancelled), 0) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) - coalesce(sum(CAST(total_price_usd_order_cancelled AS {{warehouse.DataType("float")}})), 0) from: models/rsOrderCreatedOrderCancelled description: Net amount i.e. sales-refund spent by the user to date. - entity_var: name: gross_amt_spent_in_past - select: sum(total_price_usd) + select: sum(CAST(total_price_usd AS {{warehouse.DataType("float")}})) from: models/rsOrderCreatedOrderCancelled description: Total value of products purchased till date. - entity_var: name: items_purchased_ever - select: array_agg(sku) + select: "{{array_agg('CAST(sku AS {{warehouse.DataType(\"text\")}})')}}" from: models/rsItemsPurchasedEver description: The list of unique products bought by the user. - name: session_vars @@ -454,9 +462,9 @@ var_groups: name: session_length select: | {% if warehouse.DatabaseType() == 'bigquery' %} - TIMESTAMP_DIFF({{session.Var("session_end_time")}}, {{session.Var("session_start_time")}}, SECOND) + TIMESTAMP_DIFF(CAST({{session.Var("session_end_time")}} AS TIMESTAMP), CAST({{session.Var("session_start_time")}} AS TIMESTAMP), SECOND) {% else %} - DATEDIFF(SECOND, {{session.Var("session_start_time")}}, {{session.Var("session_end_time")}}) + DATEDIFF(SECOND, CAST({{session.Var("session_start_time")}} AS TIMESTAMP), CAST({{session.Var("session_end_time")}} AS TIMESTAMP)) {% endif %} materialization: enable_status: must_have @@ -467,6 +475,7 @@ var_groups: window: order_by: - timestamp + frame_clause: "{{frame_clause}}" from: inputs/rsPages - entity_var: name: anonymous_id @@ -475,4 +484,5 @@ var_groups: window: order_by: - timestamp + frame_clause: "{{frame_clause}}" from: inputs/rsPages diff --git a/models/sql_models.yaml b/models/sql_models.yaml index 6c3d3d7..6ef8dca 100644 --- a/models/sql_models.yaml +++ b/models/sql_models.yaml @@ -35,8 +35,14 @@ models: run_type: discrete single_sql: | {% with OrderCreated = this.DeRef("inputs/rsOrderCreated") %} - select t.value['sku'] as SKU,products,token,ANONYMOUS_ID,USER_ID,TIMESTAMP,order_number,cart_token - from (select * from {{OrderCreated}} ), table(flatten(parse_json(products))) t where products is not null + select {{get_value_from_array('product_info', 'sku')}} as SKU,products,token,ANONYMOUS_ID,USER_ID,order_number,cart_token,TIMESTAMP + {% if warehouse.DatabaseType() == "redshift" || warehouse.DatabaseType() == "bigquery" %} + from (select *, {{get_array_from_json('products')}} AS product_info from {{OrderCreated}} ) where products is not null + {% elif warehouse.DatabaseType() == "databricks" %} + from (select *, products AS product_info from {{OrderCreated}} ) where products is not null + {% else %} + from (select * from {{OrderCreated}} ), table({{get_array_from_json('products')}}) product_info where products is not null + {% endif %} {% endwith %} contract: is_optional: false @@ -149,34 +155,37 @@ models: run_type: discrete single_sql: | {% with CartUpdate = this.DeRef("inputs/rsCartUpdate") %} - SELECT CAST(t.value['brand'] AS STRING) AS brand, - t.value['discounted_price'] AS discounted_price, - CAST(t.value['gift_card'] AS STRING) AS gift_card, - t.value['grams'] AS grams, - CAST(t.value['id'] AS STRING) AS id, - CAST(t.value['key'] AS STRING) AS KEY, - t.value['line_price'] AS line_price, - t.value['original_line_price'] AS original_line_price, - t.value['original_price'] AS original_price, - t.value['price'] AS price, - CAST(t.value['product_id'] AS STRING) AS product_id, - CAST(t.value['properties'] AS STRING) AS properties, - t.value['quantity'] AS quantity, - CAST(t.value['sku'] AS STRING) AS sku, - CAST(t.value['taxable'] AS STRING) AS taxable, - CAST(t.value['title'] AS STRING) AS title, - t.value['total_discount'] AS total_discount, - CAST(t.value['variant'] AS STRING) AS _VARIANT_, - products, - anonymous_id,timestamp, token + SELECT CAST({{get_value_from_array('product_info', 'brand')}} as {{warehouse.DataType("text")}}) AS brand, + {{get_value_from_array('product_info', 'discounted_price')}} AS discounted_price, + CAST({{get_value_from_array('product_info', 'gift_card')}} as {{warehouse.DataType("text")}}) AS gift_card, + {{get_value_from_array('product_info', 'grams')}} AS grams, + CAST({{get_value_from_array('product_info', 'id')}} as {{warehouse.DataType("text")}}) AS id, + CAST({{get_value_from_array('product_info', 'key')}} as {{warehouse.DataType("text")}}) AS KEY, + {{get_value_from_array('product_info', 'line_price')}} AS line_price, + {{get_value_from_array('product_info', 'original_line_price')}} AS original_line_price, + {{get_value_from_array('product_info', 'original_price')}} AS original_price, + {{get_value_from_array('product_info', 'price')}} AS price, + CAST({{get_value_from_array('product_info', 'product_id')}} as {{warehouse.DataType("text")}}) AS product_id, + CAST({{get_value_from_array('product_info', 'properties')}} as {{warehouse.DataType("text")}}) AS properties, + {{get_value_from_array('product_info', 'quantity')}} AS quantity, + CAST({{get_value_from_array('product_info', 'sku')}} as {{warehouse.DataType("text")}}) AS sku, + CAST({{get_value_from_array('product_info', 'taxable')}} as {{warehouse.DataType("text")}}) AS taxable, + CAST({{get_value_from_array('product_info', 'title')}} as {{warehouse.DataType("text")}}) AS title, + {{get_value_from_array('product_info', 'total_discount')}} AS total_discount, + CAST({{get_value_from_array('product_info', 'variant')}} as {{warehouse.DataType("text")}}) AS _VARIANT_, products, anonymous_id, token, timestamp FROM (SELECT * FROM - (SELECT *, + (SELECT *, {% if warehouse.DatabaseType() == "databricks" %} products as product_info, {% endif %} row_number() over(PARTITION BY anonymous_id, token ORDER BY timestamp DESC) AS rn FROM {{CartUpdate}} where products is not null) - WHERE rn = 1), table(flatten(parse_json(products))) t + WHERE rn = 1) + {% if warehouse.DatabaseType() == "redshift" || warehouse.DatabaseType() == "bigquery" %} + ,(select {{get_array_from_json('products')}} AS product_info FROM {{CartUpdate}}) + {% elif warehouse.DatabaseType() == "snowflake" %} + ,table({{get_array_from_json('products')}}) product_info + {% endif %} {% endwith %} ids: - select: "anonymous_id"