From 1ae4b70a1473624808d0cb1d0a415685612fceb4 Mon Sep 17 00:00:00 2001 From: Chris <1105672+firstof9@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:27:41 -0700 Subject: [PATCH] fix: create function to handle amazon email addresses lists (#1028) --- .../mail_and_packages/helpers.py | 273 +++++++++--------- tests/test_helpers.py | 17 +- 2 files changed, 144 insertions(+), 146 deletions(-) diff --git a/custom_components/mail_and_packages/helpers.py b/custom_components/mail_and_packages/helpers.py index 29f06951..82ba48d1 100644 --- a/custom_components/mail_and_packages/helpers.py +++ b/custom_components/mail_and_packages/helpers.py @@ -37,7 +37,6 @@ from .const import ( AMAZON_DELIVERED, AMAZON_DELIVERED_SUBJECT, - AMAZON_EMAIL, AMAZON_EXCEPTION, AMAZON_EXCEPTION_ORDER, AMAZON_EXCEPTION_SUBJECT, @@ -444,7 +443,14 @@ def fetch( count[sensor] = update_time() else: count[sensor] = get_count( - account, sensor, False, img_out_path, hass, amazon_image_name, amazon_domain + account, + sensor, + False, + img_out_path, + hass, + amazon_image_name, + amazon_domain, + amazon_fwds, )[ATTR_COUNT] data.update(count) @@ -931,6 +937,7 @@ def get_count( hass: Optional[HomeAssistant] = None, amazon_image_name: Optional[str] = None, amazon_domain: Optional[str] = None, + amazon_fwds: Optional[str] = None, ) -> dict: """Get Package Count. @@ -946,7 +953,7 @@ def get_count( # Return Amazon delivered info if sensor_type == AMAZON_DELIVERED: result[ATTR_COUNT] = amazon_search( - account, image_path, hass, amazon_image_name, amazon_domain + account, image_path, hass, amazon_image_name, amazon_domain, amazon_fwds ) result[ATTR_TRACKING] = "" return result @@ -1105,6 +1112,7 @@ def amazon_search( hass: HomeAssistant, amazon_image_name: str, amazon_domain: str, + fwds: str = None, ) -> int: """Find Amazon Delivered email. @@ -1115,32 +1123,26 @@ def amazon_search( subjects = AMAZON_DELIVERED_SUBJECT today = get_formatted_date() count = 0 - domains = amazon_domain.split() _LOGGER.debug("Cleaning up amazon images...") cleanup_images(f"{image_path}amazon/") - for domain in domains: - for subject in subjects: - email_address = [] - for address in AMAZON_EMAIL: - email_address.append(address + domain) - _LOGGER.debug("Amazon email search address: %s", str(email_address)) + address_list = amazon_email_addresses(fwds, amazon_domain) + _LOGGER.debug("Amazon email list: %s", str(address_list)) - (server_response, data) = email_search( - account, email_address, today, subject - ) + for subject in subjects: + (server_response, data) = email_search(account, address_list, today, subject) - if server_response == "OK" and data[0] is not None: - count += len(data[0].split()) - _LOGGER.debug("Amazon delivered email(s) found: %s", count) - get_amazon_image( - data[0], - account, - image_path, - hass, - amazon_image_name, - ) + if server_response == "OK" and data[0] is not None: + count += len(data[0].split()) + _LOGGER.debug("Amazon delivered email(s) found: %s", count) + get_amazon_image( + data[0], + account, + image_path, + hass, + amazon_image_name, + ) if count == 0: _LOGGER.debug("No Amazon deliveries found.") @@ -1317,34 +1319,20 @@ def amazon_exception( tfmt = get_formatted_date() count = 0 info = {} - domains = the_domain.split() - if isinstance(fwds, list): - for fwd in fwds: - if fwd and fwd != '""' and fwd not in domains: - domains.append(fwd) - _LOGGER.debug("Amazon email adding %s to list", str(fwd)) - _LOGGER.debug("Amazon domains to be checked: %s", str(domains)) + address_list = amazon_email_addresses(fwds, the_domain) + _LOGGER.debug("Amazon email list: %s", str(address_list)) - for domain in domains: - if "@" in domain: - email_address = domain.strip('"') - _LOGGER.debug("Amazon email search address: %s", str(email_address)) - else: - email_address = [] - email_address.append(f"{AMAZON_EMAIL}{domain}") - _LOGGER.debug("Amazon email search address: %s", str(email_address)) - - (server_response, sdata) = email_search( - account, email_address, tfmt, AMAZON_EXCEPTION_SUBJECT - ) + (server_response, sdata) = email_search( + account, address_list, tfmt, AMAZON_EXCEPTION_SUBJECT + ) - if server_response == "OK": - count += len(sdata[0].split()) - _LOGGER.debug("Found %s Amazon exceptions", count) - order_numbers = get_tracking(sdata[0], account, AMAZON_PATTERN) - for order in order_numbers: - order_number.append(order) + if server_response == "OK": + count += len(sdata[0].split()) + _LOGGER.debug("Found %s Amazon exceptions", count) + order_numbers = get_tracking(sdata[0], account, AMAZON_PATTERN) + for order in order_numbers: + order_number.append(order) info[ATTR_COUNT] = count info[ATTR_ORDER] = order_number @@ -1377,6 +1365,31 @@ def amazon_date_format(arrive_date: str, lang: str) -> tuple: return (arrive_date, "%A, %B %d") +def amazon_email_addresses( + fwds: Optional[str] = None, the_domain: str = None +) -> list | None: + """Return Amazon email addresses in list format.""" + domains = [] + domains.extend(_process_amazon_forwards(fwds)) + the_domain = the_domain.split() + domains.extend(the_domain) + value = [] + + for domain in domains: + if "@" in domain: + email_address = domain.strip('"') + value.append(email_address) + else: + email_address = [] + addresses = AMAZON_SHIPMENT_TRACKING + for address in addresses: + email_address.append(f"{address}@{domain}") + value.extend(email_address) + + _LOGGER.debug("Amazon email search addresses: %s", str(value)) + return value + + def get_items( account: Type[imaplib.IMAP4_SSL], param: str = None, @@ -1395,107 +1408,91 @@ def get_items( tfmt = past_date.strftime("%d-%b-%Y") deliveries_today = [] order_number = [] - domains = [] - domains.extend(_process_amazon_forwards(fwds)) - the_domain = the_domain.split() - domains.extend(the_domain) - _LOGGER.debug("Amazon email list: %s", str(domains)) + address_list = amazon_email_addresses(fwds, the_domain) + _LOGGER.debug("Amazon email list: %s", str(address_list)) - for domain in domains: - if "@" in domain: - email_address = domain.strip('"') - _LOGGER.debug("Amazon email search address: %s", str(email_address)) - else: - email_address = [] - addresses = AMAZON_SHIPMENT_TRACKING - for address in addresses: - email_address.append(f"{address}@{domain}") - _LOGGER.debug("Amazon email search address: %s", str(email_address)) - - (server_response, sdata) = email_search(account, email_address, tfmt) - - if server_response == "OK": - mail_ids = sdata[0] - id_list = mail_ids.split() - _LOGGER.debug("Amazon emails found: %s", str(len(id_list))) - for i in id_list: - data = email_fetch(account, i, "(RFC822)")[1] - for response_part in data: - if isinstance(response_part, tuple): - msg = email.message_from_bytes(response_part[1]) - - _LOGGER.debug("Email Multipart: %s", str(msg.is_multipart())) - _LOGGER.debug("Content Type: %s", str(msg.get_content_type())) - - # Get order number from subject line - encoding = decode_header(msg["subject"])[0][1] - if encoding is not None: - email_subject = decode_header(msg["subject"])[0][0].decode( - encoding, "ignore" - ) - else: - email_subject = decode_header(msg["subject"])[0][0] + (server_response, sdata) = email_search(account, address_list, tfmt) - if not isinstance(email_subject, str): - _LOGGER.debug("Converting subject to string.") - email_subject = email_subject.decode("utf-8", "ignore") + if server_response == "OK": + mail_ids = sdata[0] + id_list = mail_ids.split() + _LOGGER.debug("Amazon emails found: %s", str(len(id_list))) + for i in id_list: + data = email_fetch(account, i, "(RFC822)")[1] + for response_part in data: + if isinstance(response_part, tuple): + msg = email.message_from_bytes(response_part[1]) - _LOGGER.debug("Amazon Subject: %s", str(email_subject)) - pattern = re.compile(r"[0-9]{3}-[0-9]{7}-[0-9]{7}") + _LOGGER.debug("Email Multipart: %s", str(msg.is_multipart())) + _LOGGER.debug("Content Type: %s", str(msg.get_content_type())) - # Don't add the same order number twice - if ( - (found := pattern.findall(email_subject)) - and len(found) > 0 - and found[0] not in order_number - ): - order_number.append(found[0]) - - try: - if msg.is_multipart(): - email_msg = quopri.decodestring(str(msg.get_payload(0))) - else: - email_msg = quopri.decodestring(str(msg.get_payload())) - except Exception as err: - _LOGGER.debug( - "Problem decoding email message: %s", str(err) - ) - _LOGGER.error("Unable to process this email. Skipping.") - continue - email_msg = email_msg.decode("utf-8", "ignore") + # Get order number from subject line + encoding = decode_header(msg["subject"])[0][1] + if encoding is not None: + email_subject = decode_header(msg["subject"])[0][0].decode( + encoding, "ignore" + ) + else: + email_subject = decode_header(msg["subject"])[0][0] - # Check message body for order number - if ( - (found := pattern.findall(email_msg)) - and len(found) > 0 - and found[0] not in order_number - ): - order_number.append(found[0]) + if not isinstance(email_subject, str): + _LOGGER.debug("Converting subject to string.") + email_subject = email_subject.decode("utf-8", "ignore") - for search in AMAZON_TIME_PATTERN: - _LOGGER.debug("Looking for: %s", search) - if search not in email_msg: - continue + _LOGGER.debug("Amazon Subject: %s", str(email_subject)) + pattern = re.compile(r"[0-9]{3}-[0-9]{7}-[0-9]{7}") - start = email_msg.find(search) + len(search) - end = amazon_date_search(email_msg) + # Don't add the same order number twice + if ( + (found := pattern.findall(email_subject)) + and len(found) > 0 + and found[0] not in order_number + ): + order_number.append(found[0]) - arrive_date = email_msg[start:end].replace(">", "").strip() - _LOGGER.debug("First pass: %s", arrive_date) - arrive_date = arrive_date.split(" ") - arrive_date = arrive_date[0:3] - arrive_date = " ".join(arrive_date).strip() + try: + if msg.is_multipart(): + email_msg = quopri.decodestring(str(msg.get_payload(0))) + else: + email_msg = quopri.decodestring(str(msg.get_payload())) + except Exception as err: + _LOGGER.debug("Problem decoding email message: %s", str(err)) + _LOGGER.error("Unable to process this email. Skipping.") + continue + email_msg = email_msg.decode("utf-8", "ignore") + + # Check message body for order number + if ( + (found := pattern.findall(email_msg)) + and len(found) > 0 + and found[0] not in order_number + ): + order_number.append(found[0]) + + for search in AMAZON_TIME_PATTERN: + _LOGGER.debug("Looking for: %s", search) + if search not in email_msg: + continue + + start = email_msg.find(search) + len(search) + end = amazon_date_search(email_msg) + + arrive_date = email_msg[start:end].replace(">", "").strip() + _LOGGER.debug("First pass: %s", arrive_date) + arrive_date = arrive_date.split(" ") + arrive_date = arrive_date[0:3] + arrive_date = " ".join(arrive_date).strip() - # Get the date object - dateobj = dateparser.parse(arrive_date) + # Get the date object + dateobj = dateparser.parse(arrive_date) - if ( - dateobj is not None - and dateobj.day == datetime.date.today().day - and dateobj.month == datetime.date.today().month - ): - deliveries_today.append("Amazon Order") + if ( + dateobj is not None + and dateobj.day == datetime.date.today().day + and dateobj.month == datetime.date.today().month + ): + deliveries_today.append("Amazon Order") value = None if param == "count": diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 2f565eb5..b4b87660 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -711,7 +711,7 @@ async def test_royal_out_for_delivery(hass, mock_imap_royal_out_for_delivery): async def test_amazon_shipped_count(hass, mock_imap_amazon_shipped, caplog): result = get_items(mock_imap_amazon_shipped, "count", the_domain="amazon.com") assert ( - "Amazon email search address: ['shipment-tracking@amazon.com', 'conferma-spedizione@amazon.com', 'confirmar-envio@amazon.com', 'versandbestaetigung@amazon.com', 'confirmation-commande@amazon.com', 'verzending-volgen@amazon.com', 'update-bestelling@amazon.com']" + "Amazon email search addresses: ['shipment-tracking@amazon.com', 'conferma-spedizione@amazon.com', 'confirmar-envio@amazon.com', 'versandbestaetigung@amazon.com', 'confirmation-commande@amazon.com', 'verzending-volgen@amazon.com', 'update-bestelling@amazon.com']" in caplog.text ) assert result == 1 @@ -820,7 +820,7 @@ async def test_amazon_search_delivered( ) await hass.async_block_till_done() assert ( - "Amazon email search address: ['order-update@amazon.com', 'update-bestelling@amazon.com', 'versandbestaetigung@amazon.com']" + "Amazon email search addresses: ['shipment-tracking@amazon.com', 'conferma-spedizione@amazon.com', 'confirmar-envio@amazon.com', 'versandbestaetigung@amazon.com', 'confirmation-commande@amazon.com', 'verzending-volgen@amazon.com', 'update-bestelling@amazon.com']" in caplog.text ) assert result == 9 @@ -1071,9 +1071,7 @@ async def test_image_file_name( @pytest.mark.asyncio async def test_amazon_exception(hass, mock_imap_amazon_exception, caplog): - result = amazon_exception( - mock_imap_amazon_exception, ['""'], the_domain="amazon.com" - ) + result = amazon_exception(mock_imap_amazon_exception, the_domain="amazon.com") assert result["order"] == ["123-1234567-1234567"] assert result["count"] == 1 @@ -1082,9 +1080,9 @@ async def test_amazon_exception(hass, mock_imap_amazon_exception, caplog): ["testemail@fakedomain.com"], the_domain="amazon.com", ) - assert result["count"] == 2 + assert result["count"] == 1 assert ( - "Amazon domains to be checked: ['amazon.com', 'testemail@fakedomain.com']" + "Amazon email list: ['shipment-tracking@amazon.com', 'conferma-spedizione@amazon.com', 'confirmar-envio@amazon.com', 'versandbestaetigung@amazon.com', 'confirmation-commande@amazon.com', 'verzending-volgen@amazon.com', 'update-bestelling@amazon.com']" in caplog.text ) @@ -1144,6 +1142,9 @@ async def test_amazon_shipped_fwd(hass, mock_imap_amazon_fwd, caplog): result = get_items( mock_imap_amazon_fwd, "order", fwds="testuser@test.com", the_domain="amazon.com" ) - assert "Amazon email list: ['testuser@test.com', 'amazon.com']" in caplog.text + assert ( + "Amazon email list: ['testuser@test.com', 'shipment-tracking@amazon.com', 'conferma-spedizione@amazon.com', 'confirmar-envio@amazon.com', 'versandbestaetigung@amazon.com', 'confirmation-commande@amazon.com', 'verzending-volgen@amazon.com', 'update-bestelling@amazon.com']" + in caplog.text + ) assert result == ["123-1234567-1234567"] assert "First pass: Tuesday, January 11" in caplog.text