Skip to content

Commit

Permalink
Add logging for app sources
Browse files Browse the repository at this point in the history
  • Loading branch information
ddxv committed Oct 17, 2023
1 parent bce9631 commit 2f20ce6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 18 deletions.
73 changes: 56 additions & 17 deletions adscrawler/app_stores/scrape_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def scrape_stores_frontpage(
process_scraped(
database_connection=database_connection,
ranked_dicts=ranked_dicts,
crawl_source="scrape_frontpage_top",
collections_map=collections_map,
categories_map=categories_map,
countries_map=countries_map,
Expand All @@ -68,6 +69,7 @@ def scrape_stores_frontpage(
process_scraped(
database_connection=database_connection,
ranked_dicts=ranked_dicts,
crawl_source="scrape_frontpage_top",
collections_map=collections_map,
categories_map=categories_map,
countries_map=countries_map,
Expand All @@ -77,20 +79,17 @@ def scrape_stores_frontpage(
process_scraped(
database_connection=database_connection,
ranked_dicts=dicts,
crawl_source="scrape_rss_apkcombo",
)
except Exception:
logger.exception("ApkCombo RSS feed failed")
return


def process_scraped(
database_connection: PostgresCon,
ranked_dicts: list[dict],
collections_map: pd.DataFrame | None = None,
categories_map: pd.DataFrame | None = None,
countries_map: pd.DataFrame | None = None,
def insert_new_apps(
dicts: list[dict], database_connection: PostgresCon, crawl_source: str
) -> None:
df = pd.DataFrame(ranked_dicts)
df = pd.DataFrame(dicts)
all_scraped_ids = df["store_id"].unique().tolist()
existing_ids_map = query_store_id_map(
database_connection, store_ids=all_scraped_ids
Expand All @@ -99,16 +98,52 @@ def process_scraped(
new_apps_df = df[~(df["store_id"].isin(existing_store_ids))][
["store", "store_id"]
].drop_duplicates()
if not new_apps_df.empty:
logger.info(f"Scrape store: insert new apps to db {new_apps_df.shape=}")

if new_apps_df.empty:
logger.info(f"Scrape {crawl_source=} no new apps")
return
else:
logger.info(
f"Scrape {crawl_source=} insert new apps to db {new_apps_df.shape=}"
)
insert_columns = ["store", "store_id"]
upsert_df(
inserted_apps: pd.DataFrame = upsert_df(
table_name="store_apps",
insert_columns=insert_columns,
df=new_apps_df,
key_columns=insert_columns,
database_connection=database_connection,
return_rows=True,
)
if inserted_apps is not None and not inserted_apps.empty:
inserted_apps["crawl_source"] = crawl_source
inserted_apps = inserted_apps.rename(columns={"id": "store_app"})
insert_columns = ["store", "store_app"]
upsert_df(
table_name="store_app_sources",
insert_columns=insert_columns,
df=inserted_apps,
key_columns=insert_columns,
database_connection=database_connection,
schema="logging",
)


def process_scraped(
database_connection: PostgresCon,
ranked_dicts: list[dict],
crawl_source: str,
collections_map: pd.DataFrame | None = None,
categories_map: pd.DataFrame | None = None,
countries_map: pd.DataFrame | None = None,
) -> None:
insert_new_apps(
database_connection=database_connection,
dicts=ranked_dicts,
crawl_source=crawl_source,
)
df = pd.DataFrame(ranked_dicts)
all_scraped_ids = df["store_id"].unique().tolist()
if "rank" not in df.columns or countries_map is None:
return
logger.info("Store rankings start")
Expand Down Expand Up @@ -177,7 +212,11 @@ def crawl_developers_for_new_store_ids(
developer_ids = [unquote_plus(x) for x in developer_ids]
apps_df = crawl_google_developers(developer_ids, store_ids)
if not apps_df.empty:
crawl_stores_for_app_details(apps_df, database_connection)
process_scraped(
database_connection=database_connection,
ranked_dicts=apps_df.to_dict(orient="records"),
crawl_source="crawl_developers",
)
dev_df = pd.DataFrame(
[
{
Expand Down Expand Up @@ -211,12 +250,12 @@ def crawl_developers_for_new_store_ids(
)

if not apps_df.empty:
apps_df = clean_scraped_df(df=apps_df, store=store)
save_apps_df(
apps_df,
database_connection,
update_developer=False,
country="us",
process_scraped(
database_connection=database_connection,
ranked_dicts=apps_df[["store", "store_id"]].to_dict(
orient="records"
),
crawl_source="crawl_developers",
)
dev_df = pd.DataFrame(
[
Expand Down
2 changes: 1 addition & 1 deletion pg-ddl/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ CREATE TABLE logging.store_app_sources (
crawl_source text NULL,
CONSTRAINT store_app_sources_pk PRIMARY KEY (store, store_app),
CONSTRAINT store_app_sources_store_fk FOREIGN KEY (store) REFERENCES public.stores(id),
CONSTRAINT store_app_sources_app_fk FOREIGN KEY (store_app) REFERENCES public.store_apps(id)
CONSTRAINT store_app_sources_app_fk FOREIGN KEY (store_app) REFERENCES public.store_apps(id) ON DELETE CASCADE
);


Expand Down

0 comments on commit 2f20ce6

Please sign in to comment.