Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge usage tracking #115

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 234 additions & 0 deletions apps/usage_tracking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
## Usage Tracking

This folder contains a series of cloud functions that generate an event click stream in BigQuery based on Data Catalog user activity:

1) `entry_views`: which users have viewed which entries in the catalog over time
2) `tag_creates`: which users have created a tag on which entries in the catalog over time
3) `tag_updates`: which users have updated a tag on which entries in the catalog over time
4) `tag_deletes`: which users have deleted a tag on which entries in the catalog over time

The functions analyze the Data Catalog audit log entries which are synced to BigQuery. They map the entry ID to the resource name and output a summary for each event that is easier to consume. The cloud function is wrapped by a remote BigQuery function and runs on a daily schedule.


### How to Deploy

#### Step 1: Enable audit logging

From the GCP console, go to the IAM section and open the Audit Logs page. Search on `Service:Data Catalog` and enable Data Read and Data Write audit logs for Data Catalog.

#### Step 2: Create a log sync to BigQuery

```
gcloud logging sinks create data-catalog-audit-sink bigquery.googleapis.com/projects/$BIGQUERY_PROJECT/datasets/$BIGQUERY_DATASET \
--log-filter='resource.type="protoPayload.serviceName="datacatalog.googleapis.com"'
```

#### Step 3: Create a Cloud Functions service account

Designate a service account for running the functions and authorize the account to connect to the cloud functions, create the reporting tables, and run the queries in BigQuery.

```
export SA="data-catalog-auditor@PROJECT.iam.gserviceaccount.com"

gcloud projects add-iam-policy-binding $BIGQUERY_PROJECT \
--member=serviceAccount:$SA \
--role=roles/bigquery.connectionUser

gcloud projects add-iam-policy-binding $BIGQUERY_PROJECT \
--member=serviceAccount:$SA \
--role=roles/bigquery.dataEditor

gcloud projects add-iam-policy-binding $BIGQUERY_PROJECT \
--member=serviceAccount:$SA \
--role=roles/bigquery.jobUser

gcloud projects add-iam-policy-binding $BIGQUERY_PROJECT \
--member=serviceAccount:$SA \
--role=roles/bigquery.resourceViewer

```

#### Step 4: Create a BigQuery cloud resource connection

```
bq mk --connection --display_name='cloud function connection' --connection_type=CLOUD_RESOURCE \
--project_id=$BIGQUERY_PROJECT --location=$BIGQUERY_REGION remote-connection

bq show --location=$BIGQUERY_REGION --connection remote-connection
```

The expected output from the `bq show` command contains a "serviceAccountId" property for the connection resource that starts with `bqcx-` and ends with `@gcp-sa-bigquery-condel.iam.gserviceaccount.com`. We'll refer to this service account in the following steps as `CONNECTION_SA`.

Once the cloud functions are created, we'll need to assign the Cloud Functions Invoker role (`roles/cloudfunctions.invoker`) to the `CONNECTION_SA`.

For more details on creating cloud resource connections, refer to the [product documentation](https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#sample_code).


#### Step 5: Create the cloud functions

The remote functions in BigQuery each call a cloud function by the same name.

The source for the cloud functions is split up into subfolders under `datacatalog-tag-engine/apps/usage_tracking`

Create the cloud functions by running these commands:

```
cd datacatalog-tag-engine/apps/usage_tracking

gcloud functions deploy entry_clicks \
--region=$BIGQUERY_REGION \
--source=entry_clicks \
--entry-point=event_handler \
--runtime=python311 \
--trigger-http \
--service-account=$SA \
--timeout=540s \
--no-allow-unauthenticated

gcloud functions deploy tag_creates \
--region=$BIGQUERY_REGION \
--source=tag_creates \
--entry-point=event_handler \
--runtime=python311 \
--trigger-http \
--service-account=$SA \
--timeout=540s \
--no-allow-unauthenticated

gcloud functions deploy tag_deletes \
--region=$BIGQUERY_REGION \
--source=tag_creates \
--entry-point=event_handler \
--runtime=python311 \
--trigger-http \
--service-account=$SA \
--timeout=540s \
--no-allow-unauthenticated

gcloud functions deploy tag_updates \
--region=$BIGQUERY_REGION \
--source=tag_creates \
--entry-point=event_handler \
--runtime=python311 \
--trigger-http \
--service-account=$SA \
--timeout=540s \
--no-allow-unauthenticated
```

#### Step 6: Assign `$CONNECTION_SA` permissions to run the functions

The `$CONNECTION_SA` is the account that runs the resource connection:

```
gcloud functions add-iam-policy-binding entry_clicks \
--member=serviceAccount:$CONNECTION_SA \
--role="roles/cloudfunctions.invoker" \
--project=$BIGQUERY_PROJECT

gcloud functions add-iam-policy-binding tag_creates \
--member=serviceAccount:$CONNECTION_SA \
--role="roles/cloudfunctions.invoker" \
--project=$BIGQUERY_PROJECT

gcloud functions add-iam-policy-binding tag_deletes \
--member=serviceAccount:$CONNECTION_SA \
--role="roles/cloudfunctions.invoker" \
--project=$BIGQUERY_PROJECT

gcloud functions add-iam-policy-binding tag_updates \
--member=serviceAccount:$CONNECTION_SA \
--role="roles/cloudfunctions.invoker" \
--project=$BIGQUERY_PROJECT
```


#### Step 7: Create the remote functions in BigQuery

Create a BigQuery dataset for storing the remote functions:

```
CREATE SCHEMA `$BIGQUERY_PROJECT`.usage_tracking
OPTIONS (location='$BIGQUERY_REGION');
```

Create each remote function:

```
CREATE OR REPLACE FUNCTION `$BIGQUERY_PROJECT`.usage_tracking.entry_clicks(
log_sync_project STRING,
log_sync_dataset STRING,
reporting_project STRING,
reporting_dataset STRING,
start_date DATE) RETURNS STRING
REMOTE WITH CONNECTION `$BIGQUERY_PROJECT.$BIGQUERY_REGION.remote-connection`
OPTIONS (endpoint = 'https://$BIGQUERY_REGION-$BIGQUERY_PROJECT.cloudfunctions.net/entry_clicks');


CREATE OR REPLACE FUNCTION `$BIGQUERY_PROJECT`.usage_tracking.tag_creates(
log_sync_project STRING,
log_sync_dataset STRING,
reporting_project STRING,
reporting_dataset STRING,
start_date DATE) RETURNS STRING
REMOTE WITH CONNECTION `$BIGQUERY_PROJECT.$BIGQUERY_REGION.remote-connection`
OPTIONS (endpoint = 'https://$BIGQUERY_REGION-$BIGQUERY_PROJECT.cloudfunctions.net/tag_creates');


CREATE OR REPLACE FUNCTION `$BIGQUERY_PROJECT`.usage_tracking.tag_deletes(
log_sync_project STRING,
log_sync_dataset STRING,
reporting_project STRING,
reporting_dataset STRING,
start_date DATE) RETURNS STRING
REMOTE WITH CONNECTION `$BIGQUERY_PROJECT.$BIGQUERY_REGION.remote-connection`
OPTIONS (endpoint = 'https://$BIGQUERY_REGION-$BIGQUERY_PROJECT.cloudfunctions.net/tag_deletes');


CREATE OR REPLACE FUNCTION `$BIGQUERY_PROJECT`.usage_tracking.tag_updates(
log_sync_project STRING,
log_sync_dataset STRING,
reporting_project STRING,
reporting_dataset STRING,
start_date DATE) RETURNS STRING
REMOTE WITH CONNECTION `$BIGQUERY_PROJECT.$BIGQUERY_REGION.remote-connection`
OPTIONS (endpoint = 'https://$BIGQUERY_REGION-$BIGQUERY_PROJECT.cloudfunctions.net/tag_updates');
```

The parameter `start_date` indicates the oldest date from which to process the audit log entries.


#### Step 8: Run a backfill to process all the log entries

```
SELECT `$BIGQUERY_PROJECT`.usage_tracking.entry_clicks('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', NULL)
SELECT `$BIGQUERY_PROJECT`.usage_tracking.tag_creates('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', NULL)
SELECT `$BIGQUERY_PROJECT`.usage_tracking.tag_deletes('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', NULL)
SELECT `$BIGQUERY_PROJECT`.usage_tracking.tag_updates('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', NULL)
```

Note: The last parameter is the `start_date`. When set to NULL, the function processes the audit log entries for all the dates which are present in the log sync table in BigQuery.


#### Step 9: Schedule the query

These queries process yesterday's audit log entries:

```
SELECT `$BIGQUERY_PROJECT`.usage_tracking.entry_clicks('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', current_date()-1)
SELECT `$BIGQUERY_PROJECT`.usage_tracking.tag_creates('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', current_date()-1)
SELECT `$BIGQUERY_PROJECT`.usage_tracking.tag_deletes('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', current_date()-1)
SELECT `$BIGQUERY_PROJECT`.usage_tracking.tag_updates('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', current_date()-1)
```

You can create a scheduled query in BigQuery that triggers the previous query every day:

```
bq query \
--use_legacy_sql=false \
--destination_table=reporting.scheduled_query_logs \
--display_name='Daily Entry Clicks' \
--schedule='every 24 hours' \
--replace=true \
'SELECT `$BIGQUERY_PROJECT`.usage_tracking.entry_clicks('$BIGQUERY_PROJECT', 'audit_logs', '$BIGQUERY_PROJECT', 'reporting', current_date()-1)'
```
84 changes: 50 additions & 34 deletions apps/usage_tracking/entry_clicks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
# limitations under the License.

import json

from google.cloud.exceptions import NotFound
from google.cloud import bigquery
from google.cloud import datacatalog

bq_client = bigquery.Client()
dc_client = datacatalog.DataCatalogClient()

reporting_table = 'entry_clicks' # name of the output table
reporting_table = 'entry_clicks' # designated name for the output table

def event_handler(request):
request_json = request.get_json()
Expand All @@ -29,11 +29,25 @@ def event_handler(request):
log_sync_dataset = request_json['calls'][0][1].strip()
reporting_project = request_json['calls'][0][2].strip()
reporting_dataset = request_json['calls'][0][3].strip()
start_date = request_json['calls'][0][4].strip()

if request_json['calls'][0][4] != None:
start_date = request_json['calls'][0][4].strip()
else:
start_date = None

print('log_sync_project:', log_sync_project)
print('log_sync_dataset:', log_sync_dataset)
print('reporting_project:', reporting_project)
print('reporting_dataset:', reporting_dataset)
print('start_date:', start_date)

try:
report_table_create(reporting_project, reporting_dataset, reporting_table)
created = report_table_create(reporting_project, reporting_dataset, reporting_table)
print('report table created:', created)

status = main(log_sync_project, log_sync_dataset, reporting_project, reporting_dataset, start_date)
print('status from main:', status)

return json.dumps({"replies": [status]})

except Exception as e:
Expand All @@ -43,7 +57,9 @@ def event_handler(request):

def main(log_sync_project, log_sync_dataset, reporting_project, reporting_dataset, start_date:None):

success = True
print('inside main')

status = 'SUCCESS'
rows_to_insert = []

sql = "select distinct timestamp_trunc(timestamp, SECOND) as event_time, "
Expand All @@ -58,38 +74,43 @@ def main(log_sync_project, log_sync_dataset, reporting_project, reporting_datase

print(sql)

query_job = bq_client.query(sql)
results = query_job.result()
try:
query_job = bq_client.query(sql)
results = query_job.result()

for result in results:
event_time = result.event_time
project = result.project
user_email = result.user_email
dc_entry = result.dc_entry
print('event_time:', event_time)
for result in results:
event_time = result.event_time
project = result.project
user_email = result.user_email
dc_entry = result.dc_entry
#print('event_time:', event_time)

# lookup BQ resource
entry_request = datacatalog.GetEntryRequest(name=dc_entry)
entry_response = dc_client.get_entry(request=entry_request)
bq_resource = entry_response.linked_resource.replace('//bigquery.googleapis.com/', '')
print('bq_resource:', bq_resource)
# lookup BQ resource
entry_request = datacatalog.GetEntryRequest(name=dc_entry)
entry_response = dc_client.get_entry(request=entry_request)
bq_resource = entry_response.linked_resource.replace('//bigquery.googleapis.com/', '')
print('bq_resource:', bq_resource)

event_time = event_time.strftime("%Y-%m-%d %H:%M:%S") + " UTC"
print('event_time_formatted:', event_time)
event_time = event_time.strftime("%Y-%m-%d %H:%M:%S") + " UTC"
#print('event_time_formatted:', event_time)

rows_to_insert.append({"event_time": event_time, "project": project, "user_email": user_email, "dc_entry": dc_entry, "bq_resource": bq_resource})
rows_to_insert.append({"event_time": event_time, "project": project, "user_email": user_email, "dc_entry": dc_entry, "bq_resource": bq_resource})

if len(rows_to_insert) > 0:
status = insert_records(reporting_project, reporting_dataset, reporting_table, rows_to_insert)

if len(rows_to_insert) > 0:
success = insert_records(reporting_project, reporting_dataset, reporting_table, rows_to_insert)
except Exception as e:
print('Error in main: ', e)
status = 'ERROR'

return success
return status


def insert_records(reporting_project, reporting_dataset, reporting_table, rows_to_insert):

print('insert_records')

success = True
status = 'SUCCESS'

table_id = reporting_project + '.' + reporting_dataset + '.' + reporting_table
job_config = bigquery.LoadJobConfig(schema=report_table_schema(), source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON)
Expand All @@ -111,13 +132,15 @@ def insert_records(reporting_project, reporting_dataset, reporting_table, rows_t
errors = bq_client.insert_rows_json(table_id, rows_to_insert)
except Exception as e:
print("Error occurred during insert_records: {}".format(e))
success = False
status = 'ERROR'

return success
return status


def report_table_create(project, dataset, table):

print('inside report_table_create')

created = True

table_id = project + '.' + dataset + '.' + table
Expand All @@ -126,7 +149,6 @@ def report_table_create(project, dataset, table):
try:
table = bq_client.get_table(table_ref)
created = False
return created

except NotFound:

Expand All @@ -149,9 +171,3 @@ def report_table_schema():
]

return schema


if __name__ == "__main__":
# log_sync_project, log_sync_dataset, reporting_project, reporting_dataset, start_date
success = main('tag-engine-develop', 'audit_logs', 'tag-engine-develop', 'reporting', '2023-10-01')
print('success:', success)
Loading