From a15306873e11d1c0d3b2b2b95964d04def85f183 Mon Sep 17 00:00:00 2001 From: james Date: Fri, 4 Oct 2024 03:12:08 +0800 Subject: [PATCH] fix issue where returning was failing due to psycopg crazy formatting issues --- adscrawler/app_stores/apple.py | 2 +- adscrawler/queries.py | 54 ++++++++++++++++------------------ 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/adscrawler/app_stores/apple.py b/adscrawler/app_stores/apple.py index d374b54..e0d1e13 100644 --- a/adscrawler/app_stores/apple.py +++ b/adscrawler/app_stores/apple.py @@ -47,7 +47,6 @@ def get_app_ids_with_retry( timeout=10, ) except Exception as e: - retries += 1 if retries == max_retries: raise e delay = min(base_delay * (2**retries) + random.uniform(0, 1), max_delay) @@ -55,6 +54,7 @@ def get_app_ids_with_retry( f"Attempt {retries} failed. Retrying in {delay:.2f} seconds..." ) time.sleep(delay) + retries += 1 if len(app_ids) == 0: logger.info( f"Collection: {coll_value}, category: {cat_value} failed to load apps!" diff --git a/adscrawler/queries.py b/adscrawler/queries.py index ee0088e..c3f8334 100644 --- a/adscrawler/queries.py +++ b/adscrawler/queries.py @@ -11,6 +11,7 @@ logger = get_logger(__name__) + def upsert_df( df: pd.DataFrame, table_name: str, @@ -42,23 +43,22 @@ def upsert_df( raw_conn = database_connection.engine.raw_connection() - if 'crawled_date' in df.columns and df['crawled_date'].isna().all(): - df['crawled_date'] = pd.to_datetime(df['crawled_date']).dt.date - df['crawled_date'] = None - if 'release_date' in df.columns and df['release_date'].isna().all(): - df['release_date']= None + if "crawled_date" in df.columns and df["crawled_date"].isna().all(): + df["crawled_date"] = pd.to_datetime(df["crawled_date"]).dt.date + df["crawled_date"] = None + if "release_date" in df.columns and df["release_date"].isna().all(): + df["release_date"] = None all_columns = list(set(key_columns + insert_columns)) table_identifier = Identifier(table_name) if schema: - table_identifier = Composed([Identifier(schema), SQL('.'), table_identifier]) - - columns = SQL(', ').join(map(Identifier, all_columns)) - placeholders = SQL(', ').join(SQL('%s') for _ in all_columns) - conflict_columns = SQL(', ').join(map(Identifier, key_columns)) - update_set = SQL(', ').join( - SQL('{0} = EXCLUDED.{0}').format(Identifier(col)) - for col in all_columns + table_identifier = Composed([Identifier(schema), SQL("."), table_identifier]) + + columns = SQL(", ").join(map(Identifier, all_columns)) + placeholders = SQL(", ").join(SQL("%s") for _ in all_columns) + conflict_columns = SQL(", ").join(map(Identifier, key_columns)) + update_set = SQL(", ").join( + SQL("{0} = EXCLUDED.{0}").format(Identifier(col)) for col in all_columns ) # Upsert query without RETURNING clause @@ -72,34 +72,32 @@ def upsert_df( columns=columns, placeholders=placeholders, conflict_columns=conflict_columns, - update_set=update_set + update_set=update_set, ) - where_conditions = SQL(' AND ').join( - SQL('{0} = %s').format(Identifier(col)) - for col in key_columns + sel_where_conditions = SQL(" AND ").join( + SQL("{} = ANY(%s)").format(Identifier(col)) for col in key_columns ) select_query = SQL(""" SELECT * FROM {table} WHERE {where_conditions} - """).format( - table=table_identifier, - where_conditions=where_conditions - ) + """).format(table=table_identifier, where_conditions=sel_where_conditions) if log: logger.info(f"Upsert query: {upsert_query.as_string(raw_conn)}") logger.info(f"Select query: {select_query.as_string(raw_conn)}") with raw_conn.cursor() as cur: # Perform upsert - data = [tuple(row) for row in df[all_columns].itertuples(index=False, name=None)] + data = [ + tuple(row) for row in df[all_columns].itertuples(index=False, name=None) + ] cur.executemany(upsert_query, data) # Fetch affected rows if required if return_rows: - select_params = [row[all_columns.index(col)] for row in data for col in key_columns] - cur.execute(select_query, select_params) + where_values = [df[col].tolist() for col in key_columns] + cur.execute(select_query, where_values) result = cur.fetchall() column_names = [desc[0] for desc in cur.description] return_df = pd.DataFrame(result, columns=column_names) @@ -110,7 +108,6 @@ def upsert_df( return return_df - def query_developers( database_connection: PostgresCon, store: int, @@ -147,7 +144,6 @@ def query_developers( return df - def query_store_id_map( database_connection: PostgresCon, store: int | None = None, @@ -319,11 +315,11 @@ def query_store_apps( OR crawl_result IS NULL ) """ - if group == 'short': + if group == "short": installs_and_dates_str = short_group - elif group == 'long': + elif group == "long": installs_and_dates_str = long_group - elif group == 'max': + elif group == "max": installs_and_dates_str = max_group else: installs_and_dates_str = f"""(