Skip to content

Commit

Permalink
ci: gen2 pubsub trigger with multiple schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobdadams committed Aug 9, 2024
1 parent 5e26f9f commit bed3ea7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 67 deletions.
86 changes: 38 additions & 48 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
with:
name: wmrc-skid
runtime: python311
entry_point: main
entry_point: subscribe
source_dir: src/wmrc
service_account: cloud-function-sa@${{ secrets.PROJECT_ID }}.iam.gserviceaccount.com
event_trigger_type: google.cloud.pubsub.topic.v1.messagePublished
Expand All @@ -106,57 +106,47 @@ jobs:
- name: 🕰️ Create Main Cloud Scheduler
run: |
for i in $(gcloud scheduler jobs list --location=us-central1 --uri); do
gcloud scheduler jobs delete $i --quiet
done
gcloud scheduler jobs create pubsub $SCHEDULE_NAME \
--description="$SCHEDULE_DESCRIPTION" \
--schedule="$SCHEDULE_CRON" \
--time-zone=America/Denver \
--location=us-central1 \
--topic=$SCHEDULE_NAME-topic \
--message-body='{"run": "now"}' \
--quiet
- name: 📥 Create Validator PubSub topic
run: |
if [ ! "$(gcloud pubsub topics list | grep $VALIDATOR_SCHEDULE_NAME-topic)" ]; then
gcloud pubsub topics create $VALIDATOR_SCHEDULE_NAME-topic --quiet
if [ ! "$(gcloud scheduler jobs list --location=us-central1 | grep $SCHEDULE_NAME)" ]; then
gcloud scheduler jobs create pubsub $SCHEDULE_NAME \
--description="$SCHEDULE_DESCRIPTION" \
--schedule="$SCHEDULE_CRON" \
--time-zone=America/Denver \
--location=us-central1 \
--topic=$SCHEDULE_NAME-topic \
--message-body='facility updates' \
--quiet
else
gcloud scheduler jobs update pubsub $SCHEDULE_NAME \
--description="$SCHEDULE_DESCRIPTION" \
--schedule="$SCHEDULE_CRON" \
--time-zone=America/Denver \
--location=us-central1 \
--topic=$SCHEDULE_NAME-topic \
--message-body='facility updates' \
--quiet
fi
- name: 🚀 Deploy Validator Cloud Function
id: deploy-validator
uses: google-github-actions/deploy-cloud-functions@v3
timeout-minutes: 15
with:
name: wmrc-skid-validator
runtime: python311
entry_point: validate
source_dir: src/wmrc
service_account: cloud-function-sa@${{ secrets.PROJECT_ID }}.iam.gserviceaccount.com
event_trigger_type: google.cloud.pubsub.topic.v1.messagePublished
event_trigger_pubsub_topic: projects/${{ secrets.PROJECT_ID }}/topics/${{ env.VALIDATOR_SCHEDULE_NAME }}-topic
memory: ${{ env.CLOUD_FUNCTION_MEMORY }}
service_timeout: ${{ env.CLOUD_FUNCTION_RUN_TIMEOUT }}
environment_variables: STORAGE_BUCKET=${{secrets.STORAGE_BUCKET}}
secrets: |
/secrets/app/secrets.json=${{secrets.PROJECT_ID}}/skid-secrets
max_instance_count: 1
event_trigger_retry: false

- name: 🕰️ Create Validator Cloud Scheduler
run: |
for i in $(gcloud scheduler jobs list --location=us-central1 --uri); do
gcloud scheduler jobs delete $i --quiet
done
gcloud scheduler jobs create pubsub $VALIDATOR_SCHEDULE_NAME \
--description="$VALIDATOR_SCHEDULE_DESCRIPTION" \
--schedule="$VALIDATOR_SCHEDULE_CRON" \
--time-zone=America/Denver \
--location=us-central1 \
--topic=$VALIDATOR_SCHEDULE_NAME-topic \
--message-body='{"run": "now"}' \
--quiet
if [ ! "$(gcloud scheduler jobs list --location=us-central1 | grep $VALIDATOR_SCHEDULE_NAME)" ]; then
gcloud scheduler jobs create pubsub $VALIDATOR_SCHEDULE_NAME \
--description="$VALIDATOR_SCHEDULE_DESCRIPTION" \
--schedule="$VALIDATOR_SCHEDULE_CRON" \
--time-zone=America/Denver \
--location=us-central1 \
--topic=$SCHEDULE_NAME-topic \
--message-body='validate' \
--quiet
else
gcloud scheduler jobs update pubsub $VALIDATOR_SCHEDULE_NAME \
--description="$VALIDATOR_SCHEDULE_DESCRIPTION" \
--schedule="$VALIDATOR_SCHEDULE_CRON" \
--time-zone=America/Denver \
--location=us-central1 \
--topic=$SCHEDULE_NAME-topic \
--message-body='validate' \
--quiet
fi
deploy-prod:
name: Deploy to GCF - prod
Expand Down
68 changes: 49 additions & 19 deletions src/wmrc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
Run the wmrc script as a cloud function.
"""
import base64
import json
import logging
import sys
Expand All @@ -12,21 +13,24 @@
from types import SimpleNamespace

import arcgis
import functions_framework
import google.auth
import pandas as pd
from arcgis.features import GeoAccessor, GeoSeriesAccessor # noqa: F401
from cloudevents.http import CloudEvent
from palletjack import extract, load, transform
from supervisor.message_handlers import SendGridHandler
from supervisor.models import MessageDetails, Supervisor

#: This makes it work when calling with just `python <file>`/installing via pip and in the gcf framework, where
#: the relative imports fail because of how it's calling the function.
try:
from . import config, helpers, summarize, version, yearly
from . import config, helpers, summarize, validate, version, yearly
except ImportError:
import config
import helpers
import summarize
import validate
import version
import yearly

Expand Down Expand Up @@ -415,37 +419,63 @@ def _load_salesforce_data(self) -> helpers.SalesForceRecords:
return salesforce_records


def main(event, context): # pylint: disable=unused-argument
# def main(event, context): # pylint: disable=unused-argument
# """Entry point for Google Cloud Function triggered by pub/sub event

# Args:
# event (dict): The dictionary with data specific to this type of
# event. The `@type` field maps to
# `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
# The `data` field maps to the PubsubMessage data
# in a base64-encoded string. The `attributes` field maps
# to the PubsubMessage attributes if any is present.
# context (google.cloud.functions.Context): Metadata of triggering event
# including `event_id` which maps to the PubsubMessage
# messageId, `timestamp` which maps to the PubsubMessage
# publishTime, `event_type` which maps to
# `google.pubsub.topic.publish`, and `resource` which is
# a dictionary that describes the service API endpoint
# pubsub.googleapis.com, the triggering topic's name, and
# the triggering event type
# `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
# Returns:
# None. The output is written to Cloud Logging.
# """

# #: This function must be called 'main' to act as the Google Cloud Function entry point. It must accept the two
# #: arguments listed, but doesn't have to do anything with them (I haven't used them in anything yet).

# #: Call process() and any other functions you want to be run as part of the skid here.
# wmrc_skid = Skid()
# wmrc_skid.process()


@functions_framework.cloud_event
def subscribe(cloud_event: CloudEvent) -> None:
"""Entry point for Google Cloud Function triggered by pub/sub event
Args:
event (dict): The dictionary with data specific to this type of
event. The `@type` field maps to
cloud_event (CloudEvent): The CloudEvent object with data specific to this type of
event. The `type` field maps to
`type.googleapis.com/google.pubsub.v1.PubsubMessage`.
The `data` field maps to the PubsubMessage data
in a base64-encoded string. The `attributes` field maps
to the PubsubMessage attributes if any is present.
context (google.cloud.functions.Context): Metadata of triggering event
including `event_id` which maps to the PubsubMessage
messageId, `timestamp` which maps to the PubsubMessage
publishTime, `event_type` which maps to
`google.pubsub.topic.publish`, and `resource` which is
a dictionary that describes the service API endpoint
pubsub.googleapis.com, the triggering topic's name, and
the triggering event type
`type.googleapis.com/google.pubsub.v1.PubsubMessage`.
Returns:
None. The output is written to Cloud Logging.
"""

#: This function must be called 'main' to act as the Google Cloud Function entry point. It must accept the two
#: arguments listed, but doesn't have to do anything with them (I haven't used them in anything yet).
#: This function must be called 'subscribe' to act as the Google Cloud Function entry point. It must accept the
#: CloudEvent object as the only argument.

#: Call process() and any other functions you want to be run as part of the skid here.
wmrc_skid = Skid()
wmrc_skid.process()
if base64.b64decode(cloud_event.data["message"]["data"]).decode() == "facility updates":
wmrc_skid = Skid()
wmrc_skid.process()
if base64.b64decode(cloud_event.data["message"]["data"]).decode() == "validate":
validate.run_validations()


#: Putting this here means you can call the file via `python main.py` and it will run. Useful for pre-GCF testing.
if __name__ == "__main__":
main(1, 2) #: Just some junk args to satisfy the signature needed for Cloud Functions
wmrc_skid = Skid()
wmrc_skid.process()

0 comments on commit bed3ea7

Please sign in to comment.