-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrape_products.py
82 lines (71 loc) · 2.03 KB
/
scrape_products.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import requests
import glom
import sqlite3
from datetime import datetime
from airflow.decorators import task
from airflow import DAG
from settings import OXY_USERNAME, OXY_PASSWORD, DB_FILE_PATH
@task(task_id="get_products")
def get_products():
conn = sqlite3.connect(DB_FILE_PATH)
cursor = conn.cursor()
cursor.execute(
f"""
SELECT url FROM products
"""
)
urls = [url[0] for url in cursor.fetchall()]
return urls
@task
def scrape_products(product):
print(f"Scraping {product}")
payload = {
"source": "universal",
"url": product,
"parse": True,
"parsing_instructions": {
"price": {
"_fns": [
{
"_fn": "xpath_one",
"_args": [
"//p[@class=\"price_color\"]/text()"
]
},
{
"_fn": "amount_from_string"
}
]
},
}
}
response = requests.post(
url="https://realtime.oxylabs.io/v1/queries",
auth=(OXY_USERNAME, OXY_PASSWORD),
json=payload
).json()
parsing_status_code = glom.glom(response, "results.0.content.parse_status_code")
if parsing_status_code == 12000:
price = glom.glom(response, "results.0.content.price")
conn = sqlite3.connect(DB_FILE_PATH)
cursor = conn.cursor()
cursor.execute(
f"""
INSERT INTO product_prices (product_url, price)
VALUES ('{product}', '{price}')
"""
)
conn.commit()
print(f"Price {price} added for {product}")
with DAG(
"scrape_products",
description="Scraper products.",
schedule_interval="0 7 * * *",
start_date=datetime(2022, month=6, day=6, hour=13),
catchup=False,
tags=["webinar"],
default_args={
"owner": "airflow",
},
) as dag:
(scrape_products.expand(product=get_products()))