Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Allow use of unique_tmp_table_suffix flag with Iceberg tables. #688

Open
1 task done
benbeckingsale opened this issue Jul 18, 2024 · 4 comments
Open
1 task done
Labels
enhancement New feature or request

Comments

@benbeckingsale
Copy link

Is this your first time submitting a feature request?

  • I have searched the existing issues, and I could not find an existing issue for this feature

Describe the feature

I would like to enable the use of the unique_temp_table_suffix param when using Iceberg tables with a merge strategy.
Currently this flag only has an effect if also using hive tables with an insert_overwrite strategy.

Describe alternatives you've considered

Using a different temp_schema per run to avoid table name collisions – but it is much more desirable to keep all temp tables in the same schema.

Who will this benefit?

This should reduce the risk of temp table name collision for anyone using parallel DAGs/processes to write to the same model (and using Iceberg with a merge strategy).

In my case, I have DAGs A and B which write non-overlapping data to the same incremental model (DAG A writes recent data and more frequently; B writes older data and less frequently). If A and B overlap such that the temp table created by A still exists while B tries to create its own, DAG B will fail due to name collision.

DAG A -> temp_schema.model_name__dbt_tmp
DAG B -> temp_schema.model_name__dbt_tmp

If unique_tmp_table_suffix were supported in this case, the table name collision could be avoided.

DAG A -> temp_schema.model_name__018dc4
DAG B -> temp_schema.model_name__bbc3a9

Are you interested in contributing this feature?

Yes

Anything else?

Similar to:

@benbeckingsale benbeckingsale added the enhancement New feature or request label Jul 18, 2024
@nicor88
Copy link
Contributor

nicor88 commented Jul 18, 2024

I believe that this type of feature could be implemented and it does make sense in some cases.

Said so, I'm not in favor of adding such a feature because it can lead to weird data issues, and having such a feature to allow that use cases could lead to inconsistent data on the user side

Reflecting a bit more on this, could be a nice feature to have for some cases:

  • concurrent insert in different partitions
  • concurrent merge on different partitions
  • delete and insert in different partitions

Here are some edge cases that could lead to ambiguous situations:

  • parallel merge operations that act on the same row
  • delete+insert operations that act on the same row/or partitions

Said so @benbeckingsale feel free to propose a feature for it.

@benbeckingsale
Copy link
Author

Hi @nicor88 and thanks for your patience.

Just to explain my case a little more in terms of your answer above –

I'm using DBT to incrementally update a model whose input is a 'raw' time-series table (a source updated by an external streaming process which often receives late data) and whose output is an aggregated time-series.

Both input and output tables are partitioned by hour. The end goal is to run the model for the last 'x' hour partitions (2 currently). I'm using a DAG per partition so that the model can be run more frequently for the most recent partition and less frequently for older ones.

The DAG runs might overlap – hence the table collision issue – but the data written by each will not, since each DAG run is confined to a single partition in SQL. In other words this would be a 'concurrent merge on different partitions'.

This is a very common use case in my world, so I'd be very interested to know whether you can recommend alternative ways of doing this with DBT athena, perhaps using a single DAG?

This feature is still needed regardless, so I will aim to contribute soon.

@nicor88
Copy link
Contributor

nicor88 commented Aug 6, 2024

@benbeckingsale Sorry for the late reply, but I was OoO.
If you act on different partitions then should be fine, and yes having unique_temp_table_suffix is necessary in case of concurrent operations.
What comes to my mind is a simple merge for your use case, as you act in different hour partitions, or eventually an incremental behavior with append mode and a delete to force this pattern: delete+insert on the specific partitions that you are dealing with, something that I do often, but not concurrently.

Out of curiosity why do you want to make such an operation for the last x hours parallel? are you dealing with such big data that parallelism is required? Can't you process the last x hours with one query?

@teoria
Copy link

teoria commented Nov 18, 2024

hi @nicor88
i have the same needs
i created a log monitoring system, each system save json files in S3 buckets
every day a airflow dag runs dbt models that create a incremental iceberg table
so if i need a airflow backfill many dbt models are triggered at same time
and the current setup creates the temp table with the same name. :(

my workaround:

i add the unique_temporary_table_suffix() into incremental.sql file

  {% if unique_tmp_table_suffix == True and table_type == 'iceberg' %}
    {% set tmp_table_suffix = adapter.generate_unique_temporary_table_suffix() %}
  {% endif %}

and now i can execute many concurrents dagruns

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants