Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change schema and query created_at/job_assigned_at to string type #6550

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

Marlene-M-Hirose
Copy link
Contributor

@Marlene-M-Hirose Marlene-M-Hirose commented Nov 21, 2024

Description

Related Tickets & Documents

  • DENG-XXXX
  • DSRE-XXXX

Reviewer, please follow this checklist

┆Issue is synchronized with this Jira Task

@dataops-ci-bot

This comment has been minimized.

@dataops-ci-bot

This comment has been minimized.

… of nulls or not existing, update schema with all the new fields
@dataops-ci-bot
Copy link

Integration report for "expand entity/attributes field, add in conditional statements in case of nulls or not existing, update schema with all the new fields"

sql.diff

Click to expand!
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py	2024-11-23 06:55:08.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py	2024-11-23 06:55:02.000000000 +0000
@@ -16,15 +16,26 @@
     "job_id",
     "uuid",
     "applied_policies",
-    "entity",
+    "entity_type",
+    "attributes_id",
+    "attributes_slug",
+    "attributes_guid",
+    "attributes_name",
+    "attributes_summary",
+    "attributes_average_daily_users",
+    "attributes_created",
+    "attributes_promoted",
     "entity_slug",
     "entity_id",
     "created_at",
     "decision_type",
     "job_assigned_at",
-    "typed_metadata",
+    "legacy_decision_labels",
+    "policy_map",
+    "escalation_details",
 ]
 
+"""Get the bearer token for Cinder from the environment"""
 cinder_bearer_token = os.environ.get("CINDER_TOKEN")
 
 
@@ -59,6 +70,11 @@
         dict_writer = csv.DictWriter(out_file, CSV_FIELDS)
         dict_writer.writeheader()
         dict_writer.writerows(json_data)
+    # with open(filename) as fp:
+    #     reader = csv.reader(fp, delimiter=",", quotechar='"')
+    # # next(reader, None)  # skip the headers
+    #     data_read = [row for row in reader]
+    # print(data_read)
 
 
 def cinder_addon_decisions_download(date, bearer_token):
@@ -85,23 +101,152 @@
     return query_export
 
 
-def clean_json(query_export, date):
+def clean_json(query_export):
     """Turn the json file into a list to be input into a CSV for bq upload."""
     fields_list = []
     for item in query_export["items"]:
+        if item.get("user") in (None, ""):
+            r_user = "no_user_recorded"
+        else:
+            r_user = item.get("user", "no_user_recorded")
+        if item.get("queue_slug") in (None, ""):
+            r_queue_slug = "no_queue_slug_recorded"
+        else:
+            r_queue_slug = item.get("queue_slug", "no_queue_slug_recorded")
+        if item.get("job_id") in (None, ""):
+            r_job_id = "no_job_id_recorded"
+        else:
+            r_job_id = item.get("job_id", "no_job_id_recorded")
+        if item.get("uuid") in (None, ""):
+            r_uuid = "no_uuid_recorded"
+        else:
+            r_uuid = item.get("uuid", "no_uuid_recorded")
+        r_applied_policies = item.get("applied_policies")
+        if item.get("entity") in (None, ""):
+            print("no entity")
+            r_entity_type = "no_entity_recorded"
+            r_attributes_id = "no_attributes_id_recorded"
+            r_attributes_slug = "no_attributes_slug_recorded"
+            r_attributes_guid = "no_attributes_guid_recorded"
+            r_attributes_name = "no_attributes_name_recorded"
+            r_attributes_summary = "no_attributes_summary_recorded"
+            r_attributes_average_daily_users = (
+                "no_attributes_average_daily_users_recorded"
+            )
+            r_attributes_created = "no_attributes_created_recorded"
+            r_attributes_promoted = "no_attirbutes_promoted_recorded"
+        else:
+            if item["entity"]["entity_type"] in (None, ""):
+                r_entity_type = "no_entity_type_recorded"
+            else:
+                r_entity_type = item["entity"].get("entity_type")
+            if item["entity"]["attributes"]["id"] in (None, ""):
+                r_attributes_id = "no_attributes_id_recorded"
+            else:
+                r_attributes_id = item["entity"]["attributes"].get("id")
+            if "slug" in item["entity"]["attributes"]:
+                if item["entity"]["attributes"]["slug"] in (None, ""):
+                    r_attributes_slug = "no_attributes_slug_recorded"
+                else:
+                    r_attributes_slug = item["entity"]["attributes"].get("slug")
+            else:
+                r_attributes_slug = "no_attributes_slug_recorded"
+            if item["entity"]["attributes"]["id"] in (None, ""):
+                r_attributes_guid = "no_attributes_guid_recorded"
+            else:
+                r_attributes_guid = item["entity"]["attributes"].get("guid")
+            if "name" in item["entity"]["attributes"]:
+                if item["entity"]["attributes"]["name"] in (None, ""):
+                    r_attributes_name = "no_attributes_name_recorded"
+                else:
+                    r_attributes_name = item["entity"]["attributes"].get("name")
+            else:
+                r_attributes_name = "no_attributes_name_recorded"
+            if "summary" in item["entity"]["attributes"]:
+                if item["entity"]["attributes"]["summary"] in (None, ""):
+                    r_attributes_summary = "no_attributes_summary_recorded"
+                else:
+                    r_attributes_summary = item["entity"]["attributes"].get("summary")
+            else:
+                r_attributes_summary = "no_attributes_summary_recorded"
+            if item["entity"]["attributes"]["id"] in (None, ""):
+                r_attributes_average_daily_users = 0
+            else:
+                r_attributes_average_daily_users = item["entity"]["attributes"].get(
+                    "average_daily_users"
+                )
+            if item["entity"]["attributes"]["created"] in (None, ""):
+                r_attributes_created = "no_attributes_created_recorded"
+            else:
+                r_attributes_created = item["entity"]["attributes"].get("created")
+            if item["entity"]["attributes"]["id"] in (None, ""):
+                r_attributes_promoted = "no_attributes_promoted_recorded"
+            else:
+                r_attributes_promoted = item["entity"]["attributes"].get("promoted")
+        if item.get("entity_slug") in (None, ""):
+            r_entity_slug = "no_entity_slug_recorded"
+        else:
+            r_entity_slug = item.get("entity_slug", "no_entity_slug_recorded")
+        if item.get("entity_id") in (None, ""):
+            r_entity_id = "no_entity_id_recorded"
+        else:
+            r_entity_id = item.get("entity_id", "no_entity_id_recorded")
+        if item.get("created_at") in (None, ""):
+            r_created_at = "no_created_at_recorded"
+        else:
+            r_created_at = item.get("created_at", "no_created_at_recorded")
+        if item.get("decision_type") in (None, ""):
+            r_decision_type = "no_decision_type_recorded"
+        else:
+            r_decision_type = item.get("decision_type", "no_decision_type_recorded")
+        if item.get("job_assigned_at") in (None, ""):
+            r_job_assigned_at = "no_job_job_assigned_at_recorded"
+        else:
+            r_job_assigned_at = item.get(
+                "job_assigned_at", "no_job_assigned_at_recorded"
+            )
+        if item.get("typed_metadata") in (None, ""):
+            r_legacy_decision_labels = "no_legacy_decision_labels_recorded"
+            r_policy_map = "no_policy_map_recorded"
+            r_escalation_details = "no_escalation_details_recorded"
+        else:
+            if item["typed_metadata"]["legacy_decision_labels"] in (None, ""):
+                r_legacy_decision_labels = "no_legacy_decision_labels_recorded"
+            else:
+                r_legacy_decision_labels = item["typed_metadata"].get(
+                    "legacy_decision_labels"
+                )
+            if item["typed_metadata"]["policy_map"] in (None, ""):
+                r_policy_map = "no_policy_map_recorded"
+            else:
+                r_policy_map = item["typed_metadata"].get("policy_map")
+            if item["typed_metadata"]["escalation_details"] in (None, ""):
+                r_escalation_details = "no_escalation_details_recorded"
+            else:
+                r_escalation_details = item["typed_metadata"].get("escalation_details")
         field_dict = {
-            "user": item["user"],
-            "queue_slug": item["queue_slug"],
-            "job_id": item["job_id"],
-            "uuid": item["uuid"],
-            "applied_policies": item["applied_policies"],
-            "entity": item["entity"],
-            "entity_slug": item["entity_slug"],
-            "entity_id": item["entity_id"],
-            "created_at": item["created_at"],
-            "decision_type": item["decision_type"],
-            "job_assigned_at": item["job_assigned_at"],
-            "typed_metadata": item["typed_metadata"],
+            "user": r_user,
+            "queue_slug": r_queue_slug,
+            "job_id": r_job_id,
+            "uuid": r_uuid,
+            "applied_policies": r_applied_policies,
+            "entity_type": r_entity_type,
+            "attributes_id": r_attributes_id,
+            "attributes_slug": r_attributes_slug,
+            "attributes_guid": r_attributes_guid,
+            "attributes_name": r_attributes_name,
+            "attributes_summary": r_attributes_summary,
+            "attributes_average_daily_users": r_attributes_average_daily_users,
+            "attributes_created": r_attributes_created,
+            "attributes_promoted": r_attributes_promoted,
+            "entity_slug": r_entity_slug,
+            "entity_id": r_entity_id,
+            "created_at": r_created_at,
+            "decision_type": r_decision_type,
+            "job_assigned_at": r_job_assigned_at,
+            "legacy_decision_labels": r_legacy_decision_labels,
+            "policy_map": r_policy_map,
+            "escalation_details": r_escalation_details,
         }
         fields_list.append(field_dict)
     return fields_list
@@ -124,6 +269,8 @@
                     type_=bigquery.TimePartitioningType.DAY,
                     field="date",
                 ),
+                source_format=bigquery.SourceFormat.CSV,
+                field_delimiter=";",
                 skip_leading_rows=1,
                 schema=[
                     bigquery.SchemaField("date", "DATE"),
@@ -132,13 +279,22 @@
                     bigquery.SchemaField("job_id", "STRING"),
                     bigquery.SchemaField("uuid", "STRING"),
                     bigquery.SchemaField("applied_policies", "STRING"),
-                    bigquery.SchemaField("entity", "STRING"),
+                    bigquery.SchemaField("attributes_id", "STRING"),
+                    bigquery.SchemaField("attributes_slug", "STRING"),
+                    bigquery.SchemaField("attributes_guid", "STRING"),
+                    bigquery.SchemaField("attributes_name", "STRING"),
+                    bigquery.SchemaField("attributes_summary", "STRING"),
+                    bigquery.SchemaField("attributes_average_daily_users", "INT64"),
+                    bigquery.SchemaField("attributes_created", "STRING"),
+                    bigquery.SchemaField("attributes_promoted", "STRING"),
                     bigquery.SchemaField("entity_slug", "STRING"),
                     bigquery.SchemaField("entity_id", "STRING"),
-                    bigquery.SchemaField("created_at", "DATE"),
+                    bigquery.SchemaField("created_at", "STRING"),
                     bigquery.SchemaField("decision_type", "STRING"),
                     bigquery.SchemaField("job_assigned_at", "STRING"),
-                    bigquery.SchemaField("typed_metadata", "STRING"),
+                    bigquery.SchemaField("legacy_decision_labels", "STRING"),
+                    bigquery.SchemaField("policy_map", "STRING"),
+                    bigquery.SchemaField("escalation_details", "STRING"),
                 ],
             )
             destination = f"{project}.{dataset}.{table_name}${partition}"
@@ -172,7 +328,7 @@
 
     if query_export is not None:
         # This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table
-        cinder_addon_decisions_data = clean_json(query_export, date)
+        cinder_addon_decisions_data = clean_json(query_export)
         data.extend(cinder_addon_decisions_data)
     else:
         print("no data for today")
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml	2024-11-23 06:55:08.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml	2024-11-23 06:55:02.000000000 +0000
@@ -27,13 +27,48 @@
 
 - mode: NULLABLE
   name: applied_policies
+  type: ARRAY
+  description: List of policies applied to moderate addon
+
+- mode: NULLABLE
+  name: attributes_id
+  type: STRING
+  description: Attributes ID
+
+- mode: NULLABLE
+  name: attributes_slug
+  type: STRING
+  description: Attributes Slug
+
+- mode: NULLABLE
+  name: attributes_guid
+  type: STRING
+  description: Attributes GUID
+
+- mode: NULLABLE
+  name: attributes_name
   type: STRING
-  description: Policies applied to moderate addon
+  description: Attributes Name
 
 - mode: NULLABLE
-  name: entity
+  name: attributes_summary
   type: STRING
-  description: Information about the entity
+  description: Attributes Summary
+
+- mode: NULLABLE
+  name: attributes_average_daily_users
+  type: INT64
+  description: Attributes Average Daily Users
+
+- mode: NULLABLE
+  name: attributes_created
+  type: STRING
+  description: Date Attributes Created
+
+- mode: NULLABLE
+  name: attributes_promoted
+  type: STRING
+  description: Attributes Promoted
 
 - mode: NULLABLE
   name: entity_slug
@@ -47,7 +82,7 @@
 
 - mode: NULLABLE
   name: created_at
-  type: DATE
+  type: STRING
   description: Date decision made
 
 - mode: NULLABLE
@@ -57,10 +92,20 @@
 
 - mode: NULLABLE
   name: job_assigned_at
-  type: DATE
+  type: STRING
   description: Date addon report was assigned to a moderator
 
 - mode: NULLABLE
-  name: typed_metadata
+  name: legacy_decision
+  type: STRING
+  description: Legacy decision
+
+- mode: NULLABLE
+  name: policy_map
+  type: STRING
+  description: Policy map
+
+- mode: NULLABLE
+  name: escalation_details
   type: STRING
-  description: Contains more data
+  description: Escalation details

Link to full diff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants