Skip to content

Commit

Permalink
fix: create function to handle amazon email addresses lists (#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
firstof9 authored Dec 4, 2024
1 parent de0247f commit 1ae4b70
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 146 deletions.
273 changes: 135 additions & 138 deletions custom_components/mail_and_packages/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from .const import (
AMAZON_DELIVERED,
AMAZON_DELIVERED_SUBJECT,
AMAZON_EMAIL,
AMAZON_EXCEPTION,
AMAZON_EXCEPTION_ORDER,
AMAZON_EXCEPTION_SUBJECT,
Expand Down Expand Up @@ -444,7 +443,14 @@ def fetch(
count[sensor] = update_time()
else:
count[sensor] = get_count(
account, sensor, False, img_out_path, hass, amazon_image_name, amazon_domain
account,
sensor,
False,
img_out_path,
hass,
amazon_image_name,
amazon_domain,
amazon_fwds,
)[ATTR_COUNT]

data.update(count)
Expand Down Expand Up @@ -931,6 +937,7 @@ def get_count(
hass: Optional[HomeAssistant] = None,
amazon_image_name: Optional[str] = None,
amazon_domain: Optional[str] = None,
amazon_fwds: Optional[str] = None,
) -> dict:
"""Get Package Count.
Expand All @@ -946,7 +953,7 @@ def get_count(
# Return Amazon delivered info
if sensor_type == AMAZON_DELIVERED:
result[ATTR_COUNT] = amazon_search(
account, image_path, hass, amazon_image_name, amazon_domain
account, image_path, hass, amazon_image_name, amazon_domain, amazon_fwds
)
result[ATTR_TRACKING] = ""
return result
Expand Down Expand Up @@ -1105,6 +1112,7 @@ def amazon_search(
hass: HomeAssistant,
amazon_image_name: str,
amazon_domain: str,
fwds: str = None,
) -> int:
"""Find Amazon Delivered email.
Expand All @@ -1115,32 +1123,26 @@ def amazon_search(
subjects = AMAZON_DELIVERED_SUBJECT
today = get_formatted_date()
count = 0
domains = amazon_domain.split()

_LOGGER.debug("Cleaning up amazon images...")
cleanup_images(f"{image_path}amazon/")

for domain in domains:
for subject in subjects:
email_address = []
for address in AMAZON_EMAIL:
email_address.append(address + domain)
_LOGGER.debug("Amazon email search address: %s", str(email_address))
address_list = amazon_email_addresses(fwds, amazon_domain)
_LOGGER.debug("Amazon email list: %s", str(address_list))

(server_response, data) = email_search(
account, email_address, today, subject
)
for subject in subjects:
(server_response, data) = email_search(account, address_list, today, subject)

if server_response == "OK" and data[0] is not None:
count += len(data[0].split())
_LOGGER.debug("Amazon delivered email(s) found: %s", count)
get_amazon_image(
data[0],
account,
image_path,
hass,
amazon_image_name,
)
if server_response == "OK" and data[0] is not None:
count += len(data[0].split())
_LOGGER.debug("Amazon delivered email(s) found: %s", count)
get_amazon_image(
data[0],
account,
image_path,
hass,
amazon_image_name,
)

if count == 0:
_LOGGER.debug("No Amazon deliveries found.")
Expand Down Expand Up @@ -1317,34 +1319,20 @@ def amazon_exception(
tfmt = get_formatted_date()
count = 0
info = {}
domains = the_domain.split()
if isinstance(fwds, list):
for fwd in fwds:
if fwd and fwd != '""' and fwd not in domains:
domains.append(fwd)
_LOGGER.debug("Amazon email adding %s to list", str(fwd))

_LOGGER.debug("Amazon domains to be checked: %s", str(domains))
address_list = amazon_email_addresses(fwds, the_domain)
_LOGGER.debug("Amazon email list: %s", str(address_list))

for domain in domains:
if "@" in domain:
email_address = domain.strip('"')
_LOGGER.debug("Amazon email search address: %s", str(email_address))
else:
email_address = []
email_address.append(f"{AMAZON_EMAIL}{domain}")
_LOGGER.debug("Amazon email search address: %s", str(email_address))

(server_response, sdata) = email_search(
account, email_address, tfmt, AMAZON_EXCEPTION_SUBJECT
)
(server_response, sdata) = email_search(
account, address_list, tfmt, AMAZON_EXCEPTION_SUBJECT
)

if server_response == "OK":
count += len(sdata[0].split())
_LOGGER.debug("Found %s Amazon exceptions", count)
order_numbers = get_tracking(sdata[0], account, AMAZON_PATTERN)
for order in order_numbers:
order_number.append(order)
if server_response == "OK":
count += len(sdata[0].split())
_LOGGER.debug("Found %s Amazon exceptions", count)
order_numbers = get_tracking(sdata[0], account, AMAZON_PATTERN)
for order in order_numbers:
order_number.append(order)

info[ATTR_COUNT] = count
info[ATTR_ORDER] = order_number
Expand Down Expand Up @@ -1377,6 +1365,31 @@ def amazon_date_format(arrive_date: str, lang: str) -> tuple:
return (arrive_date, "%A, %B %d")


def amazon_email_addresses(
fwds: Optional[str] = None, the_domain: str = None
) -> list | None:
"""Return Amazon email addresses in list format."""
domains = []
domains.extend(_process_amazon_forwards(fwds))
the_domain = the_domain.split()
domains.extend(the_domain)
value = []

for domain in domains:
if "@" in domain:
email_address = domain.strip('"')
value.append(email_address)
else:
email_address = []
addresses = AMAZON_SHIPMENT_TRACKING
for address in addresses:
email_address.append(f"{address}@{domain}")
value.extend(email_address)

_LOGGER.debug("Amazon email search addresses: %s", str(value))
return value


def get_items(
account: Type[imaplib.IMAP4_SSL],
param: str = None,
Expand All @@ -1395,107 +1408,91 @@ def get_items(
tfmt = past_date.strftime("%d-%b-%Y")
deliveries_today = []
order_number = []
domains = []
domains.extend(_process_amazon_forwards(fwds))
the_domain = the_domain.split()
domains.extend(the_domain)

_LOGGER.debug("Amazon email list: %s", str(domains))
address_list = amazon_email_addresses(fwds, the_domain)
_LOGGER.debug("Amazon email list: %s", str(address_list))

for domain in domains:
if "@" in domain:
email_address = domain.strip('"')
_LOGGER.debug("Amazon email search address: %s", str(email_address))
else:
email_address = []
addresses = AMAZON_SHIPMENT_TRACKING
for address in addresses:
email_address.append(f"{address}@{domain}")
_LOGGER.debug("Amazon email search address: %s", str(email_address))

(server_response, sdata) = email_search(account, email_address, tfmt)

if server_response == "OK":
mail_ids = sdata[0]
id_list = mail_ids.split()
_LOGGER.debug("Amazon emails found: %s", str(len(id_list)))
for i in id_list:
data = email_fetch(account, i, "(RFC822)")[1]
for response_part in data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])

_LOGGER.debug("Email Multipart: %s", str(msg.is_multipart()))
_LOGGER.debug("Content Type: %s", str(msg.get_content_type()))

# Get order number from subject line
encoding = decode_header(msg["subject"])[0][1]
if encoding is not None:
email_subject = decode_header(msg["subject"])[0][0].decode(
encoding, "ignore"
)
else:
email_subject = decode_header(msg["subject"])[0][0]
(server_response, sdata) = email_search(account, address_list, tfmt)

if not isinstance(email_subject, str):
_LOGGER.debug("Converting subject to string.")
email_subject = email_subject.decode("utf-8", "ignore")
if server_response == "OK":
mail_ids = sdata[0]
id_list = mail_ids.split()
_LOGGER.debug("Amazon emails found: %s", str(len(id_list)))
for i in id_list:
data = email_fetch(account, i, "(RFC822)")[1]
for response_part in data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])

_LOGGER.debug("Amazon Subject: %s", str(email_subject))
pattern = re.compile(r"[0-9]{3}-[0-9]{7}-[0-9]{7}")
_LOGGER.debug("Email Multipart: %s", str(msg.is_multipart()))
_LOGGER.debug("Content Type: %s", str(msg.get_content_type()))

# Don't add the same order number twice
if (
(found := pattern.findall(email_subject))
and len(found) > 0
and found[0] not in order_number
):
order_number.append(found[0])

try:
if msg.is_multipart():
email_msg = quopri.decodestring(str(msg.get_payload(0)))
else:
email_msg = quopri.decodestring(str(msg.get_payload()))
except Exception as err:
_LOGGER.debug(
"Problem decoding email message: %s", str(err)
)
_LOGGER.error("Unable to process this email. Skipping.")
continue
email_msg = email_msg.decode("utf-8", "ignore")
# Get order number from subject line
encoding = decode_header(msg["subject"])[0][1]
if encoding is not None:
email_subject = decode_header(msg["subject"])[0][0].decode(
encoding, "ignore"
)
else:
email_subject = decode_header(msg["subject"])[0][0]

# Check message body for order number
if (
(found := pattern.findall(email_msg))
and len(found) > 0
and found[0] not in order_number
):
order_number.append(found[0])
if not isinstance(email_subject, str):
_LOGGER.debug("Converting subject to string.")
email_subject = email_subject.decode("utf-8", "ignore")

for search in AMAZON_TIME_PATTERN:
_LOGGER.debug("Looking for: %s", search)
if search not in email_msg:
continue
_LOGGER.debug("Amazon Subject: %s", str(email_subject))
pattern = re.compile(r"[0-9]{3}-[0-9]{7}-[0-9]{7}")

start = email_msg.find(search) + len(search)
end = amazon_date_search(email_msg)
# Don't add the same order number twice
if (
(found := pattern.findall(email_subject))
and len(found) > 0
and found[0] not in order_number
):
order_number.append(found[0])

arrive_date = email_msg[start:end].replace(">", "").strip()
_LOGGER.debug("First pass: %s", arrive_date)
arrive_date = arrive_date.split(" ")
arrive_date = arrive_date[0:3]
arrive_date = " ".join(arrive_date).strip()
try:
if msg.is_multipart():
email_msg = quopri.decodestring(str(msg.get_payload(0)))
else:
email_msg = quopri.decodestring(str(msg.get_payload()))
except Exception as err:
_LOGGER.debug("Problem decoding email message: %s", str(err))
_LOGGER.error("Unable to process this email. Skipping.")
continue
email_msg = email_msg.decode("utf-8", "ignore")

# Check message body for order number
if (
(found := pattern.findall(email_msg))
and len(found) > 0
and found[0] not in order_number
):
order_number.append(found[0])

for search in AMAZON_TIME_PATTERN:
_LOGGER.debug("Looking for: %s", search)
if search not in email_msg:
continue

start = email_msg.find(search) + len(search)
end = amazon_date_search(email_msg)

arrive_date = email_msg[start:end].replace(">", "").strip()
_LOGGER.debug("First pass: %s", arrive_date)
arrive_date = arrive_date.split(" ")
arrive_date = arrive_date[0:3]
arrive_date = " ".join(arrive_date).strip()

# Get the date object
dateobj = dateparser.parse(arrive_date)
# Get the date object
dateobj = dateparser.parse(arrive_date)

if (
dateobj is not None
and dateobj.day == datetime.date.today().day
and dateobj.month == datetime.date.today().month
):
deliveries_today.append("Amazon Order")
if (
dateobj is not None
and dateobj.day == datetime.date.today().day
and dateobj.month == datetime.date.today().month
):
deliveries_today.append("Amazon Order")

value = None
if param == "count":
Expand Down
Loading

0 comments on commit 1ae4b70

Please sign in to comment.