Skip to content

Commit

Permalink
fix issue where returning was failing due to psycopg crazy formatting…
Browse files Browse the repository at this point in the history
… issues
  • Loading branch information
ddxv committed Oct 3, 2024
1 parent 67399a1 commit a153068
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 30 deletions.
2 changes: 1 addition & 1 deletion adscrawler/app_stores/apple.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ def get_app_ids_with_retry(
timeout=10,
)
except Exception as e:
retries += 1
if retries == max_retries:
raise e
delay = min(base_delay * (2**retries) + random.uniform(0, 1), max_delay)
logger.warning(
f"Attempt {retries} failed. Retrying in {delay:.2f} seconds..."
)
time.sleep(delay)
retries += 1
if len(app_ids) == 0:
logger.info(
f"Collection: {coll_value}, category: {cat_value} failed to load apps!"
Expand Down
54 changes: 25 additions & 29 deletions adscrawler/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

logger = get_logger(__name__)


def upsert_df(
df: pd.DataFrame,
table_name: str,
Expand Down Expand Up @@ -42,23 +43,22 @@ def upsert_df(

raw_conn = database_connection.engine.raw_connection()

if 'crawled_date' in df.columns and df['crawled_date'].isna().all():
df['crawled_date'] = pd.to_datetime(df['crawled_date']).dt.date
df['crawled_date'] = None
if 'release_date' in df.columns and df['release_date'].isna().all():
df['release_date']= None
if "crawled_date" in df.columns and df["crawled_date"].isna().all():
df["crawled_date"] = pd.to_datetime(df["crawled_date"]).dt.date
df["crawled_date"] = None
if "release_date" in df.columns and df["release_date"].isna().all():
df["release_date"] = None

all_columns = list(set(key_columns + insert_columns))
table_identifier = Identifier(table_name)
if schema:
table_identifier = Composed([Identifier(schema), SQL('.'), table_identifier])

columns = SQL(', ').join(map(Identifier, all_columns))
placeholders = SQL(', ').join(SQL('%s') for _ in all_columns)
conflict_columns = SQL(', ').join(map(Identifier, key_columns))
update_set = SQL(', ').join(
SQL('{0} = EXCLUDED.{0}').format(Identifier(col))
for col in all_columns
table_identifier = Composed([Identifier(schema), SQL("."), table_identifier])

columns = SQL(", ").join(map(Identifier, all_columns))
placeholders = SQL(", ").join(SQL("%s") for _ in all_columns)
conflict_columns = SQL(", ").join(map(Identifier, key_columns))
update_set = SQL(", ").join(
SQL("{0} = EXCLUDED.{0}").format(Identifier(col)) for col in all_columns
)

# Upsert query without RETURNING clause
Expand All @@ -72,34 +72,32 @@ def upsert_df(
columns=columns,
placeholders=placeholders,
conflict_columns=conflict_columns,
update_set=update_set
update_set=update_set,
)

where_conditions = SQL(' AND ').join(
SQL('{0} = %s').format(Identifier(col))
for col in key_columns
sel_where_conditions = SQL(" AND ").join(
SQL("{} = ANY(%s)").format(Identifier(col)) for col in key_columns
)

select_query = SQL("""
SELECT * FROM {table}
WHERE {where_conditions}
""").format(
table=table_identifier,
where_conditions=where_conditions
)
""").format(table=table_identifier, where_conditions=sel_where_conditions)
if log:
logger.info(f"Upsert query: {upsert_query.as_string(raw_conn)}")
logger.info(f"Select query: {select_query.as_string(raw_conn)}")

with raw_conn.cursor() as cur:
# Perform upsert
data = [tuple(row) for row in df[all_columns].itertuples(index=False, name=None)]
data = [
tuple(row) for row in df[all_columns].itertuples(index=False, name=None)
]
cur.executemany(upsert_query, data)

# Fetch affected rows if required
if return_rows:
select_params = [row[all_columns.index(col)] for row in data for col in key_columns]
cur.execute(select_query, select_params)
where_values = [df[col].tolist() for col in key_columns]
cur.execute(select_query, where_values)
result = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
return_df = pd.DataFrame(result, columns=column_names)
Expand All @@ -110,7 +108,6 @@ def upsert_df(
return return_df



def query_developers(
database_connection: PostgresCon,
store: int,
Expand Down Expand Up @@ -147,7 +144,6 @@ def query_developers(
return df



def query_store_id_map(
database_connection: PostgresCon,
store: int | None = None,
Expand Down Expand Up @@ -319,11 +315,11 @@ def query_store_apps(
OR crawl_result IS NULL
)
"""
if group == 'short':
if group == "short":
installs_and_dates_str = short_group
elif group == 'long':
elif group == "long":
installs_and_dates_str = long_group
elif group == 'max':
elif group == "max":
installs_and_dates_str = max_group
else:
installs_and_dates_str = f"""(
Expand Down

0 comments on commit a153068

Please sign in to comment.