Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Load _history tables from temp table #29

Merged
merged 2 commits into from
Nov 8, 2024

Conversation

rymarczy
Copy link
Contributor

@rymarczy rymarczy commented Nov 8, 2024

This change modifies the loading of _history table.

Rather than load a CSV directly into the _histroy table, we will first load records into the _load table and then INSERT into the _history table with a ON CONFLICT query.

This will make _history table loads idempotent, in case the same history record is loaded more than once.

cdc_df = dataframe_from_merged_csv(merge_csv, dfm_object)

key_columns = [col["name"].lower() for col in self.etl_status.last_schema if col["primaryKeyPos"] > 0]
# Load records into _history table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment here.
q: is _load a new type of table at this point? we need to communicate to folks who are using the database what it is, so they can ignore if need be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is just a table to load stuff, maybe truncating at the end as well? I want it to be obvious that it shouldn't be used by data analysts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sort of a "temp" table. It gets created by the dmap_import application when it starts to load a QLIK table and then gets dropped at the end. So DB users would only ever see it while the ETL operations is actively running.

@@ -81,7 +81,7 @@ def get_cdc_gz_csvs(etl_status: TableStatus, table: str) -> List[str]:
cdc_csvs = s3_list_cdc_gz_objects(S3_ARCHIVE, snapshot_prefix, min_ts=etl_status.last_cdc_ts)

# filter error files from table folder
for csv_file in s3_list_cdc_gz_objects(S3_ERROR, table, min_ts=etl_status.last_cdc_ts):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the fix for the S3 Permission error we were seeing as well

@rymarczy rymarczy merged commit a02fdea into main Nov 8, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants