Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: dbt dag generator - Experimental #15

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft

Conversation

MeltyBot
Copy link

Migrated from GitLab: https://gitlab.com/meltano/files-airflow/-/merge_requests/8

Originally created by @pnadolny13 on 2022-03-04 20:00:16


I demoed this in https://www.youtube.com/watch?v=pNGJ96HOioM and theres a working sample of it in https://github.com/pnadolny13/meltano_example_implementations.

Airflow DAGs created by dbt manifest.json dependencies where each dbt model is a task in Airflow and dbt sources are assumed to be fed by a Meltano elt sync job with the same tap name (i.e. dbt source tap_gitlab translates to meltano elt tap-gitlab target-x)

dag_definition.yml file is where selections are defined. The structure is open to being changed but right now it looks like:

dags:
  generator_configs:
    default_target: "target-postgres"
    stream_level: true
    dbt:
      source_tap_mapping:
        tap_gitlab.commits: tap-gitlab.commits
  dag_definitions:
    full_dag:
      # Weekends - Full DAG once a day end to end
      generator: dbt
      interval: '0 0 * * 6,0'
      selection_rule: '*'
    example_select:
      generator: dbt
      interval: '0 */2 * * 1-5'
      selection_rule: '+fact_example'

The generator runs the selection rule using meltano invoke dbt ls --select {selection_rule} to generate a list of models and sources to consider. Then it loops through the manifest.json and builds the Airflow DAG based on only the models in the list generated by the ls command.

I chose to split the generator logic out to separate classes so we can have many generator options, one of which uses dbt. If we put all the logic into the generator class then they can be swapped in/out or multiple called based on the definition yaml file. In this case we have a dbt generator and a meltano generator (using schedules).

Challenges/Assumptions:

  • The dbt source needs to match the Meltano tap. I convert underscores to hyphens by default but in the dag definition file there is an option for dags.generator_configs.dbt.soure_tap_mapping which would override the default mapping between a source name and the tap that it references. If stream maps are used to alias streams, like "commits" to "gitlab_commits" and the dbt model is named "gitlab_commits". You will need to map these explicitly.
  • Similarly the Singer target is set using the dag definition - dags.generator_configs.dbt.default_target
  • Theres an option called dags.generator_configs.dbt.stream_level which sets whether to run the full tap sync or try to run a stream level sync based on the source name. If they arent working quite right at the stream level you can easily just turn that off and let the bookmarking handle running a sync many times.

Decisions:

  • I ended up pulling the dag generation logic out so that we can have multiple implementations. As long as the implementation abides by the interface then it should work. The implementation defines how a dag is created and then it yields task pairs that are dependent on each other (TODO: we might not need to do this)
  • I'm choosing to use a precompiled "generator_cache" which includes the meltano schedules json, the dbt manifest, and the results of the selection rules defined in the dag definition. The purpose of using a cache is:
  1. to allow the Airflow webserver to run without meltano installed - aka Squared repo has a meltano and an Airflow container, so the webserver would fail to run meltano commands.
  2. to reduce the load on the Airflow webserver when it parses the dag generator file which defaults to every 30 seconds. This means without a cache every 30 seconds we are running meltano schedule list and all the meltano dbt list commands which are redundant unless you change something.
    Now the user is required to run the generator_cache_builder.py to build the cache, I imagine this will be done in CI as a build artifact.
  • I abstracted the operator creation to the new base class so that we can have all of the operators toggle between BaseOperator and KubernetesOperator (or whatever else) depending on the environment. This allows us to run locally and in prod with just a config change.
  • The ability to add custom steps to a dbt generated dag so you can do something after a specific model is completed (i.e. Squared publish/reverse etl step after dbt model is done). See the README for details on how to do this.

Also check out the readme for more details.

@aaronsteers aaronsteers marked this pull request as draft June 2, 2022 19:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants