-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
helpers.py
120 lines (94 loc) · 3.58 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from pymongo import MongoClient
import os
from bson import ObjectId
dbclient = MongoClient(os.getenv("MONGO_URI"))
database = dbclient[os.getenv("DATABASE")]
collection = database[os.getenv("COLLECTION")]
PRODUCTS = database[os.getenv("PRODUCTS")]
async def fetch_all_products(user_id):
try:
cursor = collection.find({"user_id": user_id})
products = list(cursor)
global_products = []
for product in products:
global_product = PRODUCTS.find_one({"_id": product.get("product_id")})
global_product["product_id"] = product.get("_id")
global_products.append(global_product)
return global_products
except Exception as e:
print(f"Error fetching products: {str(e)}")
return []
async def fetch_one_product(_id):
try:
product = collection.find_one({"_id": ObjectId(_id)})
global_product = PRODUCTS.find_one({"_id": product.get("product_id")})
return global_product
except Exception as e:
print(f"Error fetching product: {str(e)}")
return None
async def add_new_product(user_id, product_name, product_url, initial_price):
try:
existing_global_product = PRODUCTS.find_one({"product_name": product_name})
if not existing_global_product:
global_new_product = {
"product_name": product_name,
"url": product_url,
"price": initial_price,
"previous_price": initial_price,
"upper": initial_price,
"lower": initial_price,
}
insert_result = PRODUCTS.insert_one(global_new_product)
existing_global_product = {"_id": insert_result.inserted_id}
existing_product = collection.find_one(
{"user_id": user_id, "product_id": existing_global_product["_id"]}
)
if existing_product:
print("Product already exists.")
return existing_product["_id"]
new_local_product = {
"user_id": user_id,
"product_id": existing_global_product["_id"],
}
result = collection.insert_one(new_local_product)
print("Product added successfully.")
return result.inserted_id
except Exception as e:
print(f"Error adding product: {str(e)}")
return None
async def update_product_price(id, new_price):
try:
global_product = PRODUCTS.find_one(
{"_id": id},
)
if global_product:
upper_price = global_product.get("upper", new_price)
lower_price = global_product.get("lower", new_price)
if new_price > upper_price:
upper_price = new_price
elif new_price < lower_price:
lower_price = new_price
PRODUCTS.update_one(
{"_id": id},
{
"$set": {
"price": new_price,
"upper": upper_price,
"lower": lower_price,
}
},
)
print("Global product prices updated successfully.")
except Exception as e:
print(f"Error updating product price: {str(e)}")
async def delete_one(_id, user_id):
try:
product = collection.find_one({"_id": ObjectId(_id)})
if product and product.get("user_id") == int(user_id):
collection.delete_one({"_id": ObjectId(_id)})
return True
else:
return None
except Exception as e:
print(f"Error deleting product: {str(e)}")
return None