diff --git a/.github/workflows/gitleaks.yml b/.github/workflows/gitleaks.yml new file mode 100644 index 0000000..8d4cc50 --- /dev/null +++ b/.github/workflows/gitleaks.yml @@ -0,0 +1,19 @@ +name: gitleaks +on: + pull_request: + push: + workflow_dispatch: + schedule: + - cron: "0 4 * * *" # run once a day at 4 AM +jobs: + scan: + name: gitleaks + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - uses: gitleaks/gitleaks-action@v2 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 7d79401..caa70c4 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/README.md b/README.md index 5fe1aa9..ee7d0a3 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ QSPyLib is a bundle of API wrappers for various amateur radio-related sites, including QRZ, LOTW, eQSL, and ClubLog. -It is currently in development and should be considered unstable version-to-version while the version number is still 0.x.x. +QSPyLib is in active development; that said, major version numbers should maintain API stability. If you need absolute stability of the API, fix your version against the major. Issues and pull requests are welcome, and should be made on the [GitHub repository](https://github.com/jaytotheay/qspy). @@ -32,12 +32,14 @@ This will generate a .whl and tar.gz, which you can then install locally. ## What works right now? -As of v0.0.1: +As of v1.0.0: -* The LotW module is, in theory, finished -- no doubt something will come up about how it's not actually practical and needs more work. -* The eQSL module has most of the functionality of eQSL's API, but is incredibly unpolished and needs more work. -* The QRZ module exists; the Logbook API is currently only supported for FETCH operations, and the XML Interface is not supported yet. -* The ClubLog module is on-hold pending the ready-status of the other modules. +* The LotW module is, in theory, finished -- you can download QSOs in bulk or by criteria, check DXCC credit, get a list of users and their date of last upload, and upload a log. +* The eQSL module has most of the functionality of eQSL's API, but is a bit unpolished -- at present, you can fetch inboxes and outboxes, get AG lists, get member lists, get last upload data for users, verify an eQSL, and retrieve the eQSL card graphic for a QSL. +* The QRZ module is done; for logs, we support fetching logbooks, checking logbook statuses, and inserting and deleting records. For the XML API, we support looking up a callsign's data and looking up a DXCC's data. +* The ClubLog module only supports grabbing logbooks from ClubLog at the moment. + +Everything has been tested to work when done "correctly" and simply; no doubt some edge case will pop up, or some failure state won't throw a good error. Please open an issue for *any* problems you come across, no matter how minor, even if it's just an exception that isn't descriptive. ## How do I use it? @@ -60,8 +62,8 @@ Other functions of APIs are generally available, like checking if an eQSL is ver ```py from qspylib import eqsl -confirmed, raw_result = eqsl.verify_eqsl('N5UP', 'TEST', '160m', 'SSB', '01/01/2000') +confirmed, raw_result = eqsl.eQSLClient.verify_eqsl('N5UP', 'TEST', '160m', 'SSB', '01/01/2000') ``` -This will return a tuple; here, `confirmed` will be False, since this QSO is not verified on eQSL, and `raw_result` will contain any extra information eQSL provides, for instance, if it's Authenticity Guaranteed. +This will return a tuple; here, `confirmed` will be False, since this QSO is not verified on eQSL, and `raw_result` will contain any extra information eQSL provides, for instance, if it's Authenticity Guaranteed. Note that verify_eqsl is a static method of the eQSLClient class, and can be called either from an eQSLClient object, or directly from the class. Modules, functions, and classes are documented in-code via docstrings, and you can learn more by reading those docstrings; you can also read the [Read the Docs](http://qspylib.readthedocs.io/) listings for a visually pleasing guide on what the docstrings say. diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 0dd846f..c79d418 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -25,10 +25,10 @@ Pulling a LotW Logbook and Printing QSLs since Last Login, Using .adi Property .. code-block:: python """This example demonstrates logging into LOTW for a user 'CA7LSIGN' and fetching their QSLs with default parameters. By default, this will only return QSLs received since the last time a logbook was fetched from LOTW. - + This example also demonstrates using the .adi property of the Logbook; this property contains a parsed dictionary of the entire .adi log as received from LotW, and you can reference any present fields as dictionary keys. - """ - + """ + >>> import qspylib >>> LOTWSession = qspylib.lotw.LOTWClient("CA7LSIGN", "password") >>> lotw_logbook = LOTWSession.fetch_logbook() @@ -42,17 +42,17 @@ Pulling a LotW Logbook and Printing QSLs since Last Login, Using .adi Property 'TE5T' >>> lotw_logbook.adi[0]["BAND"] '20M' - + Pulling a LotW Logbook and Printing QSOs since Last Login, Using .log Property ********************************************************************************* .. code-block:: python - """This example demonstrates logging into LOTW for a user 'CA7LSIGN' and fetching their QSOs since 2024-10-01, and then printing them out and grabbing the QSL index. - + """This example demonstrates logging into LOTW for a user 'CA7LSIGN' and fetching their QSOs since 2024-10-01, and then printing them out. + This example also demonstrates using the .log property of the Logbook; this property contains a list of contacts, and each contains only very limited information about each QSO (the info as seen here.) The QSL property will "unify" the QSL fields as present in ClubLog, QRZ, LoTW, and eQSL, so it is handy for comparing confirmations between sources. """ - + >>> import qspylib >>> LOTWSession = qspylib.lotw.LOTWClient('CA7LSIGN', 'password') >>> lotw_logbook = LOTWSession.fetch_logbook(qso_qsl='no', qso_qsorxsince='2024-10-01') @@ -69,4 +69,114 @@ Pulling a LotW Logbook and Printing QSOs since Last Login, Using .log Property CALL: TE5T BAND: 40M MODE: FT8 DATE: 20241003 TIME: 003700 QSL: N - CALL: TE6T BAND: 20M MODE: FT8 DATE: 20241003 TIME: 004500 QSL: N \ No newline at end of file + CALL: TE6T BAND: 20M MODE: FT8 DATE: 20241003 TIME: 004500 QSL: N + +Note that you can also use the `fetch_qsos` method to fetch all QSOs; this has a simpler optional parameter set, but takes in a datetime object for date parameters. It also defaults to fetching as much information as LOTW will return about a QSO. + +.. code-block:: python + + >>> import qspylib, datetime + >>> LOTWSession = qspylib.lotw.LOTWClient('CA7LSIGN', 'password') + >>> qsosince_datetime = datetime.datetime(2024, 10, 1) + >>> lotw_logbook = LOTWSession.fetch_qsos(qsorxsince=qsosince_datetime) + >>> for contact in lotw_logbook.log: + ... print(contact) + ... + CALL: TE1T BAND: 12M MODE: FT8 DATE: 20241003 TIME: 025300 QSL: Y + + CALL: TE2T BAND: 12M MODE: FT8 DATE: 20241003 TIME: 025500 QSL: Y + + CALL: TE3T BAND: 20M MODE: FT8 DATE: 20241003 TIME: 012100 QSL: Y + + CALL: TE4T BAND: 12M MODE: FT8 DATE: 20241003 TIME: 012900 QSL: N + + CALL: TE5T BAND: 40M MODE: FT8 DATE: 20241003 TIME: 003700 QSL: N + + CALL: TE6T BAND: 20M MODE: FT8 DATE: 20241003 TIME: 004500 QSL: N + +Pulling the inbox from eQSL, using .adi Property +************************************************ +Pulling information from a user's eQSL inbox is relatively easy with qspylib. The `fetch_inbox` method will return a Logbook object with the inbox contents. +The .adi portion of the Logbook object will contain all adif fields received from eQSL, and you can reference any present fields as dictionary keys. +Note that the eQSL divides a logbook into an inbox, and an outbox; the inbox is the QSOs that the user has received, sent for confirmation by other users. + +.. code-block:: python + + """This example demonstrates logging into eQSL for a user 'CA7LSIGN' and fetching their inbox since 2024-01-01 00:00, and then printing them out. + + This example also demonstrates using the .adi property of the Logbook; this property contains a dictionary of the entire .adi log as received from eQSL, + where each contact is one record. All the information received from eQSL will be present in the .adi portion of the Logbook object, unlike the .log portion. + """ + + >>> import qspylib + >>> eQSLSession = qspylib.eqsl.eQSLClient('CA7LSIGN', 'password') + >>> eqsl_inbox = eQSLSession.fetch_inbox(rcvd_since='202401010000') + >>> for contact in eqsl_inbox.adi: + ... print(contact) + ... + 20231105 1228 TE3T MFSK Y 40M 20240120 Y EM17nt Y E +01 FT4 100.0000 + + 20231105 1230 TE5T MFSK Y 40M 20241015 Y EM12qt Y E -08 FT4 + + >>>str(eqsl_inbox.adi[0]) + '20231105 1228 TE3T MFSK Y 40M 20240120 Y EM17nt Y E +01 FT4 100.0000 \n' + >>>str(eqsl_inbox.adi[0]['CALL']) + 'TE3T' + >>>len(eqsl_inbox.adi) + 2 + +Verify a QSL with eQSL +********************** +eQSL provides for confirming that a QSL is confirmed -- if it was confirmed on eQSL. This can be done by *any* user, not just a logged in one, given they have the proper information. + +.. code-block:: python + + """This example demonstrates confirming an eQSL took place with eQSL.""" + + >>> from qspylib import eqsl + >>> confirmed, raw_result = eqsl.eQSLClient.verify_eqsl('N5UP', 'TEST', '160m', 'SSB', '01/01/2000') + >>> confirmed + False + >>> raw_result + '\r\n\r\n\r\n\r\n\r\n\r\n\r\n Error - Result: QSO not on file\r\n \r\n \r\n ' + +In current versions of qspylib, parsing raw_result for additional information, such as authenticity guaranteed status or the error cause, is left as an exercise for the reader. + +Retrieving an eQSL Graphic +************************** +eQSL provides for retrieving the graphic for the digital QSL card corresponding to a QSO. Note that they request you only do at most, six requests per minute. + +.. code-block:: python + + "This example demonstrates retrieving an eQSL graphic for a given QSO and displaying it using PIL." + + >>> import qspylib + >>> from datetime import datetime + >>> from PIL import Image + >>> eqsl_client = qspylib.eqsl.eQSLClient("CAL7SIGN", "notarealpassword") + >>> inbox = eqsl_client.fetch_inbox_qsls() + >>> str(inbox.adi[12]) + '20230101 0730 TE5T FT8 Y 20M 20230101 Y EM12em Y E +00 \n' + >>> qso_datetime = datetime(2023, 1, 1, 7, 30) + >>> image_data = eqsl_client.retrieve_graphic("te5t", qso_datetime, "20m", "FT8") + >>> image_data + <_io.BytesIO object at 0x0000000000000000> + >>> image = Image.open(image_data) + >>> image + + >>> image.show() # this will open the actual image file, showing you the image. + +Looking up a callsign on QRZ +**************************** +QRZ allows an authenticated user to lookup certain information about a QRZ user. This information will be returned by qspylib as a dictionary that can be parsed, sharing a structure with the XML tree returned by QRZ. +.. code-block:: python + + """This example demonstrates grabbing information about a callsign from QRZ's XML API.""" + + >>> from qspylib import qrz + >>> QRZXMLSession = qrz.QRZXMLClient('TE5T', 'password', agent='sample_program/0.0.1') + >>> info = QRZXMLSession.lookup_callsign('aa7bq') + >>> info + {'QRZDatabase': {'@version': '1.34', '@xmlns': 'http://xmldata.qrz.com', 'Callsign': {'call': 'AA7BQ', 'aliases': 'N6UFT,AA7BQ/DL1,KJ6RK,AA7BQ/HR6', 'dxcc': '291', 'attn': 'AA7BQ', 'fname': 'FRED L', 'name': 'LLOYD', 'addr1': '24 W. Camelback Rd, STE A-488', 'addr2': 'Phoenix', 'state': 'AZ', 'zip': '85013', 'country': 'United States', 'lat': '33.509665', 'lon': '-112.074142', 'grid': 'DM33xm', 'county': 'Maricopa', 'ccode': '271', 'fips': '04013', 'land': 'United States', 'efdate': '2022-04-29', 'expdate': '2030-01-20', 'class': 'E', 'codes': 'HAI', 'qslmgr': 'via QRZ', 'email': 'aa7bq@qrz.com', 'u_views': '345756', 'bio': '12804', 'biodate': '2023-02-17 17:37:29', 'image': 'https://cdn-xml.qrz.com/q/aa7bq/fred1962.jpg', 'imageinfo': '636:800:90801', 'moddate': '2022-10-09 17:32:38', 'MSA': '6200', 'AreaCode': '602', 'TimeZone': 'Mountain', 'GMTOffset': '-7', 'DST': 'N', 'eqsl': '0', 'mqsl': '0', 'cqzone': '3', 'ituzone': '6', 'born': '1953', 'lotw': '0', 'user': 'AA7BQ', 'geoloc': 'user', 'name_fmt': 'FRED L LLOYD'}, 'Session': {'Key': 'nicetrykiddo', 'Count': '539', 'SubExp': 'Mon Sep 15 02:38:30 2025', 'GMTime': 'Sun Nov 24 04:22:11 2024', 'Remark': 'cpu: 0.018s'}}} + >>> info['QRZDatabase']['Callsign']['TimeZone'] + 'Mountain' \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index 7ab78d4..a0e5b34 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -26,16 +26,16 @@ QSPyLib :alt: PyPI - License -``qsyplib`` is a bundle of API wrappers for various amateur radio-related websites, including QRZ, LOTW, eQSL, and ClubLog. +``qsyplib`` is a bundle of API wrappers for various amateur radio-related sites, including QRZ, LOTW, eQSL, and ClubLog. -It is currently in development and should be considered unstable version-to-version while the version number is still 0.x.x. +QSPyLib is in active development; that said, major version numbers should maintain API stability. If you need absolute stability of the API, fix your version against the major. Issues and pull requests are welcome, and should be made on the [GitHub repository](https://github.com/JayToTheAy/QSPy). .. toctree:: :maxdepth: 2 :caption: Contents: - + modules qspylib examples diff --git a/pyproject.toml b/pyproject.toml index 0036043..2a2d152 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,20 +17,24 @@ authors = [ ] description = "A set of API wrappers for different amateur radio websites, including LotW, QRZ, eQSL, and ClubLog" readme = "README.md" -keywords = ["QRZ", "LOTW", "eQSL", "API", "amateur radio"] +keywords = ["QRZ", "LOTW", "eQSL", "ClubLog", "API", "amateur radio", "ham radio"] classifiers = [ - "Programming Language :: Python :: 3.9", + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Topic :: Communications :: Ham Radio", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", - "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", - "Operating System :: OS Independent", - "Development Status :: 2 - Pre-Alpha" + "Operating System :: OS Independent", ] [project.urls] Homepage = "https://github.com/JayToTheAy/QSPy" +Documentation = "https://qspylib.readthedocs.io/en/" Issues = "https://github.com/JayToTheAy/QSPy/issues" [tool.hatch.version] diff --git a/requirements.txt b/requirements.txt index a5c00b1..afb6935 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/src/qspylib/_version.py b/src/qspylib/_version.py index bf89ba7..71bc9da 100644 --- a/src/qspylib/_version.py +++ b/src/qspylib/_version.py @@ -1,3 +1,3 @@ """unified version string""" -__version__ = "1.0.0a3" +__version__ = "1.0.0" diff --git a/src/qspylib/eqsl.py b/src/qspylib/eqsl.py index c6c38bb..efab5f0 100644 --- a/src/qspylib/eqsl.py +++ b/src/qspylib/eqsl.py @@ -3,6 +3,8 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. """Functions and classes related to querying the eQSL API. """ +from datetime import datetime +from io import BytesIO import requests from .logbook import Logbook from ._version import __version__ @@ -66,7 +68,7 @@ def set_timeout(self, timeout: int): # actual GETs - def get_last_upload_date(self): + def get_last_upload_date(self) -> datetime: """Gets last upload date for the logged in user. Raises: @@ -74,22 +76,27 @@ def get_last_upload_date(self): HTTPError: An error occurred while trying to make a connection. Returns: - str: date of last upload for the active user. Date is formatted:\ - DD-MMM-YYYY at HH:mm UTC + datetime.datetime: datetime of last upload for the active user.\ + Contains: Year, Month, Date, Hour, Minute (UTC). """ with self.session as s: - r = s.get(self.base_url + "DisplayLastUploadDate.cfm", timeout=self.timeout) - if r.status_code == requests.codes.ok: + response = s.get( + self.base_url + "DisplayLastUploadDate.cfm", timeout=self.timeout + ) + if response.status_code == requests.codes.ok: success_txt = "Your last ADIF upload was" - if success_txt in r.text: - return r.text[r.text.index("(") + 1 : r.text.index(")")] - raise eQSLError(r.text) - raise r.raise_for_status() + if success_txt in response.text: + time_str = response.text[ + response.text.index("(") + 1 : response.text.index(")") + ] + return datetime.strptime(time_str, "%d-%b-%Y at %H:%M UTC") + raise eQSLError(response.text) + raise response.raise_for_status() def fetch_inbox( self, limit_date_lo: str = None, - limit_date_hi: str = None, # pylint: disable=R0914,R0913 + limit_date_hi: str = None, rcvd_since: str = None, confirmed_only: str = None, unconfirmed_only: str = None, @@ -145,31 +152,33 @@ def fetch_inbox( params = {k: v for k, v in params.items() if v is not None} with self.session as s: - r = s.get( + response = s.get( self.base_url + "DownloadInBox.cfm", params=params, timeout=self.timeout ) - if r.status_code == requests.codes.ok: + if response.status_code == requests.codes.ok: adif_found_txt = "Your ADIF log file has been built" adif_status = ( - r.text.index(adif_found_txt) if adif_found_txt in r.text else -1 + response.text.index(adif_found_txt) + if adif_found_txt in response.text + else -1 ) if adif_status < 0: raise eQSLError("Failed to generate ADIF.") - adif_link_start_idx = r.text.index('
  • .ADI file') + adif_link_start_idx = response.text.index('
  • .ADI file') adif_link = ( - self.base_url + r.text[adif_link_start_idx:adif_link_end_idx] + self.base_url + response.text[adif_link_start_idx:adif_link_end_idx] ) adif_response = requests.get(adif_link, timeout=self.timeout) if adif_response.status_code == requests.codes.ok: return Logbook(self.callsign, adif_response.text) - raise r.raise_for_status() - raise r.raise_for_status() + raise response.raise_for_status() + raise response.raise_for_status() def fetch_inbox_qsls( self, limit_date_lo: str = None, - limit_date_hi: str = None, # pylint: disable = R0913 + limit_date_hi: str = None, rcvd_since: str = None, archive: str = None, ham_only: str = None, @@ -218,31 +227,137 @@ def fetch_outbox(self): qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ with self.session as s: - r = s.get(self.base_url + "DownloadADIF.cfm", timeout=self.timeout) - if r.status_code == requests.codes.ok: + response = s.get(self.base_url + "DownloadADIF.cfm", timeout=self.timeout) + if response.status_code == requests.codes.ok: adif_found_txt = "Your ADIF log file has been built" adif_status = ( - r.text.index(adif_found_txt) if adif_found_txt in r.text else -1 + response.text.index(adif_found_txt) + if adif_found_txt in response.text + else -1 ) if adif_status < 0: raise eQSLError("Failed to generate ADIF.") - adif_link_start_idx = r.text.index('
  • .ADI file') + adif_link_start_idx = response.text.index('
  • .ADI file') adif_link = ( - self.base_url + r.text[adif_link_start_idx:adif_link_end_idx] + self.base_url + response.text[adif_link_start_idx:adif_link_end_idx] ) adif_response = requests.get(adif_link, timeout=self.timeout) if adif_response.status_code == requests.codes.ok: return Logbook(self.callsign, adif_response.text) - raise r.raise_for_status() - raise r.raise_for_status() + raise response.raise_for_status() + raise response.raise_for_status() + + def _retrieve_graphic( + self, + callsign_from: str, + qso_year: str, + qso_month: str, + qso_day: str, + qso_hour: str, + qso_minute: str, + qso_band: str, + qso_mode: str, + timeout: int = 15, + ) -> BytesIO: + """Retrieve the graphic image for a QSO from eQSL. This is the raw\ + interface, as provided by eQSL.cc, with string parameters. + + Note: + It is recommended you use the `retrieve_graphic` method instead. + + Args: + callsign_from (str): The callsign of the sender of the eQSL + qso_year (str): YYYY OR YY format date of the QSO + qso_month (str): MM format + qso_day (str): DD format + qso_hour (str): HH format (24-hour time) + qso_minute (str): MM format + qso_band (str): 20m, 80M, 70cm, etc. (case insensitive) + qso_mode (str): Must match exactly and should be an ADIF-compatible mode + timeout (int, optional): time to connection timeout. Defaults to 15. + + Raises: + eQSLError: An error occurred interfacing with eQSL. + HTTPError: An error occurred while trying to make a connection. + + Returns: + BytesIO: A BytesIO binary stream containing the image data. This\ + can be further processed with PIL or other image libraries;\ + for instance, using PIL, Image.open(return_val) will give a PIL Image. + """ + params = { + "CallsignFrom": callsign_from, + "QSOYear": qso_year, + "QSOMonth": qso_month, + "QSODay": qso_day, + "QSOHour": qso_hour, + "QSOMinute": qso_minute, + "QSOBand": qso_band, + "QSOMode": qso_mode, + } + response = self.session.get( + self.base_url + "GeteQSL.cfm", params=params, timeout=timeout + ) + if response.status_code == requests.codes.ok: + try: + url_beg_idx = response.text.index('')
+                img_url = BytesIO: + """Retrieve the graphic image for a QSO from eQSL. This is a simplified\ + interface that uses a datetime object. + + Args: + callsign_from (str): The callsign of the sender of the eQSL + qso_datetime (datetime): Datetime containing the Year, Month, Day,\ + Hour, and Minute of the QSO. + band (str): 20m, 80M, 70cm, etc. (case insensitive) + mode (str): Must match exactly and should be an ADIF-compatible mode + timeout (int, optional): Seconds before query times out. Defaults to 15. + + Raises: + eQSLError: An error occurred interfacing with eQSL. + HTTPError: An error occurred while trying to make a connection. + + Returns: + BytesIO: A BytesIO binary stream containing the image data. This\ + can be further processed with PIL or other image libraries;\ + for instance, using PIL, Image.open(return_val) will give a PIL Image. + """ + return self._retrieve_graphic( + callsign_from, + qso_datetime.year, + qso_datetime.month, + qso_datetime.day, + qso_datetime.hour, + qso_datetime.minute, + band, + mode, + timeout, + ) # region Static Methods + @staticmethod def verify_eqsl( callsign_from: str, callsign_to: str, - qso_band: str, # pylint: disable=R0913 + qso_band: str, qso_mode: str = None, qso_date: str = None, timeout: int = 15, @@ -287,7 +402,7 @@ def verify_eqsl( ) if response.status_code == requests.codes.ok: raw_result = response.text - # TO-DO: make this a case statement + if "Result - QSO on file" in raw_result: return True, raw_result if "Parameter missing" not in raw_result: @@ -295,47 +410,6 @@ def verify_eqsl( raise eQSLError(raw_result) raise response.raise_for_status() - @staticmethod - def retrieve_graphic( - username: str, - password: str, - callsign_from: str, - qso_year: str, - qso_month: str, - qso_day: str, - qso_hour: str, - qso_minute: str, - qso_band: str, - qso_mode: str, - timeout: int = 15, - ): - """Retrieve the graphic image for a QSO from eQSL. - - Note: - Not yet implemented. - - Args: - username (str): The callsign of the recipient of the eQSL - password (str): The password of the user's account - callsign_from (str): The callsign of the sender of the eQSL - qso_year (str): YYYY OR YY format date of the QSO - qso_month (str): MM format - qso_day (str): DD format - qso_hour (str): HH format (24-hour time) - qso_minute (str): MM format - qso_band (str): 20m, 80M, 70cm, etc. (case insensitive) - qso_mode (str): Must match exactly and should be an ADIF-compatible mode - timeout (int, optional): time to connection timeout. Defaults to 15. - - Todo: - Implement this function. - - Raises: - NotImplementedError: Not yet implemented. - - """ - raise NotImplementedError - @staticmethod def get_ag_list(timeout: int = 15): """Get a list of Authenticity Guaranteed members. diff --git a/src/qspylib/logbook.py b/src/qspylib/logbook.py index 2b97390..133636b 100644 --- a/src/qspylib/logbook.py +++ b/src/qspylib/logbook.py @@ -61,6 +61,36 @@ def __eq__(self, other): return True return False + def qso_to_adif_io_qso(self) -> adif_io.QSO: + """Converts a QSO object into an adif.io QSO object. + + Returns: + adif_io.QSO: an adif.io QSO object + """ + return adif_io.QSO( + { + "CALL": self.their_call, + "BAND": self.band, + "MODE": self.mode, + "QSO_DATE": self.qso_date, + "TIME_ON": self.time_on, + "QSL_RCVD": self.qsl_rcvd, + } + ) + + def qso_to_adif_string(self) -> str: + """Converts a QSO object into an adif formatted string. + + Returns: + str: an adif formatted string + """ + return f"{self.their_call}\ + {self.band}\ + {self.mode}\ + {self.qso_date}\ + {self.time_on}\ + {self.qsl_rcvd}" + class Logbook: """A Logbook has both an adi field, holding all fields parsed from an .adi\ diff --git a/src/qspylib/lotw.py b/src/qspylib/lotw.py index 67ab074..ce3f0fc 100644 --- a/src/qspylib/lotw.py +++ b/src/qspylib/lotw.py @@ -357,9 +357,10 @@ def __split_datetime(dt: datetime): tuple[str, str]: Tuple containing the date and time, respectively. """ date, time = None, None - date = dt.strftime("%Y-%m-%d") - if ":" in dt: + if dt is not None: + date = dt.strftime("%Y-%m-%d") time = dt.strftime("%H:%M:%S") + print("got here") return date, time diff --git a/src/qspylib/qrz.py b/src/qspylib/qrz.py index b700092..fc31ca8 100644 --- a/src/qspylib/qrz.py +++ b/src/qspylib/qrz.py @@ -8,8 +8,10 @@ from collections import OrderedDict from typing import Any from urllib.parse import urlparse, parse_qs +from re import findall import requests import xmltodict +import adif_io from .logbook import Logbook from ._version import __version__ @@ -20,22 +22,33 @@ # region Exceptions -class QRZInvalidSession(Exception): +class QRZInvalidSessionError(Exception): """Error for when session is invalid.""" def __init__( self, - message="Got no session key back. This session is \ + message="Got no session key back. This session is\ invalid.", ): self.message = message super().__init__(self, message) +class QRZLogbookError(Exception): + """Error for when a logbook error occurs.""" + + def __init__( + self, + message="An error occurred interacting with the Logbook.", + ): + self.message = message + super().__init__(self, message) + + # endregion -# region Client Classes +# region Logbook Client class QRZLogbookClient: """API wrapper for accessing QRZ Logbook data.""" @@ -74,9 +87,10 @@ def fetch_logbook(self, option: str = None) -> Logbook: Raises: HTTPError: An error occurred trying to make a connection. + QRZLogbookError: An error occurred trying to interact with the logbook. Returns: - qspylib.logbook.Logbook: A logbook containing the user’s QSOs. + qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ data = {"KEY": self.key, "ACTION": "FETCH", "OPTION": option} # filter down to only used params @@ -90,40 +104,12 @@ def fetch_logbook(self, option: str = None) -> Logbook: urlparse("ws://a.a/?" + html.unescape(response.text))[4], strict_parsing=True, ) - return QRZLogbookClient.__stringify(self, response_dict["ADIF"]) - - # iff we didn't manage to return from a logged in session, raise an error - raise response.raise_for_status() - - # def fetch_logbook_paged(self, per_page:int=50, option:str=None): - # - # data = { - # 'KEY': self.key, - # 'ACTION': 'FETCH', - # 'OPTION': 'MAX:' + str(per_page) + "," + option - # } - # # filter down to only used params - # response = requests.post(self.base_url, data=data, headers=self.headers) - # - # raise NotImplementedError - - # def insert_record(self, qso:adif_io.QSO, option:str=None): - # """Inserts a single QSO into the logbook corresponding to the\ - # Client's API Key. - - # Args: - # qso (adif_io.QSO): _description_ - # option (str, optional): _description_. Defaults to None. - - # Raises: - # NotImplementedError: _description_ - # """ - # data = { - # 'KEY': self.key, - # 'ACTION': 'INSERT', - # 'OPTION': option - # } - # raise NotImplementedError + if response_dict.get("RESULT")[0] == "OK": + return QRZLogbookClient.__stringify(self, response_dict["ADIF"][0]) + else: + raise QRZLogbookError(response_dict.get("REASON")[0]) + else: + raise response.raise_for_status() def delete_record(self, list_logids: list) -> dict[str, list[str]]: """Deletes log records from the logbook corresponding to the\ @@ -138,6 +124,7 @@ def delete_record(self, list_logids: list) -> dict[str, list[str]]: Raises: HTTPError: An error occurred trying to make a connection. + QRZLogbookError: An error occurred trying to interact with the logbook. Returns: dict[str, list[str]]: A dict containing the returned information\ @@ -153,10 +140,78 @@ def delete_record(self, list_logids: list) -> dict[str, list[str]]: urlparse("ws://a.a/?" + html.unescape(response.text))[4], strict_parsing=True, ) - return response_dict - # iff we didn't manage to return from a logged in session, raise an error - raise response.raise_for_status() + match_str = response_dict.get("RESULT")[0] + + if match_str == "OK": + return { + "RESULT": response_dict.get("RESULT")[0], + "COUNT": response_dict.get("COUNT")[0], + } + elif match_str == "PARTIAL": + return { + "RESULT": response_dict.get("RESULT")[0], + "COUNT": response_dict.get("COUNT")[0], + "LOGIDS": QRZLogbookClient.convert_logids_to_list( + response_dict.get("LOGIDS") + ), + } + elif match_str == "FAIL": + raise QRZLogbookError(response_dict.get("REASON")[0]) + else: + raise QRZLogbookError( + "An invalid state was reached with no known error." + ) + + else: + raise response.raise_for_status() + + def insert_record(self, adif: adif_io.QSO, option: str = None) -> list: + """Insert records into the logbook corresponding to the Client's API Key. + + Args: + adif (adif_io.QSO): adif_io.QSO object to insert into the logbook. + option (str, optional): REPLACE To automatically overwrite any existing\ + QSOs. Defaults to None. + + Raises: + QRZLogbookError: The logbook API returned an error, and the reason is included. + QRZLogbookError: An unknown condition was reached with the logbook API. + HTTPError: An error occurred trying to make a connection. + + Returns: + list: list of logids for records that were inserted or replaced. + """ + data = { + "KEY": self.key, + "ACTION": "INSERT", + "ADIF": str(adif), + "OPTION": option, + } + response = requests.post( + self.base_url, data=data, headers=self.headers, timeout=self.timeout + ) + if response.status_code == requests.codes.ok: + response_dict = parse_qs( + urlparse("ws://a.a/?" + html.unescape(response.text))[4], + strict_parsing=True, + ) + match_str = response_dict.get("RESULT")[0] + + if match_str == "OK": + return QRZLogbookClient.convert_logids_to_list( + response_dict["LOGID"][0] + ) + elif match_str == "REPLACE": + return QRZLogbookClient.convert_logids_to_list( + response_dict["LOGID"][0] + ) + elif match_str == "FAIL": + raise QRZLogbookError(str(response_dict.get("REASON")[0])) + else: + raise QRZLogbookError("Unknown error occurred.") + else: + raise response.raise_for_status() def check_status(self, list_logids: list = None) -> dict[str, list[str]]: """Gets the status of a logbook based on the API Key supplied\ @@ -169,6 +224,7 @@ def check_status(self, list_logids: list = None) -> dict[str, list[str]]: Raises: HTTPError: An error occurred trying to make a connection. + QRZLogbookError: An error occurred trying to interact with the logbook. Returns: dict[str, list[str]]: A dict containing the returned status\ @@ -176,7 +232,16 @@ def check_status(self, list_logids: list = None) -> dict[str, list[str]]: field by QRZ's API, e.g. DXCC count is 'DXCC_COUNT', confirmed\ is 'CONFIRMED', etc. """ - data = {"KEY": self.key, "ACTION": "STATUS", "LOGIDS": ",".join(list_logids)} + if list_logids is None: + data = {"KEY": self.key, "ACTION": "STATUS"} + else: + data = { + "KEY": self.key, + "ACTION": "STATUS", + "LOGIDS": ",".join( + QRZLogbookClient.convert_logids_to_list(list_logids) + ), + } response = requests.post( self.base_url, data=data, headers=self.headers, timeout=self.timeout @@ -186,23 +251,42 @@ def check_status(self, list_logids: list = None) -> dict[str, list[str]]: urlparse("ws://a.a/?" + html.unescape(response.text))[4], strict_parsing=True, ) - return response_dict - - # iff we didn't manage to return from a logged in session, raise an error - raise response.raise_for_status() + if response_dict.get("RESULT")[0] == "OK": + result = {} + for kvp in response_dict.items(): + result[kvp[0]] = kvp[1][0] + return result + else: + raise QRZLogbookError(response_dict.get("REASON")[0]) + else: + raise response.raise_for_status() ### Helpers - def __stringify(self, adi_log) -> Logbook: - # qrz_output = html.unescape(adi_log) - # start_of_log, end_of_log = qrz_output.index('ADIF=') + 5, - # qrz_output.rindex('\n\n') + 4 log_adi = ( "" + adi_log ) # adif_io expects a header, so we're giving it an end of header return Logbook(self.key, log_adi) + @staticmethod + def convert_logids_to_list(logids: str) -> list: + """When QRZ returns a list of logids, they are returned as a weird, gross\ + string. This parses that and returns an actual list of the integers. + Args: + logids (str): list of logids as generated by QRZ's API + + Returns: + list: actual list of integer logids + """ + regex = r"\d+" + return findall(regex, logids) + + +# endregion + + +# region XML Client class QRZXMLClient: """A wrapper for the QRZ XML interface. This functionality requires being logged in and maintaining a session. @@ -230,6 +314,9 @@ def __init__( them. Defaults to None. timeout (int, optional): Time in seconds to wait for a response.\ Defaults to 15. + + Raises: + QRZInvalidSessionError: An error occurred trying to instantiate a session. """ self.username = username self.password = password @@ -261,7 +348,7 @@ def _initiate_session(self): xml_dict = xmltodict.parse(response.text) key = xml_dict["QRZDatabase"]["Session"].get("Key") if not key: - raise QRZInvalidSession() + raise QRZInvalidSessionError() self.session_key = key @@ -273,7 +360,7 @@ def _verify_session(self): self.base_url, params=params, headers=self.headers, timeout=self.timeout ) if not xmltodict.parse(response.text)["QRZDatabase"]["Session"].get("Key"): - raise QRZInvalidSession() + raise QRZInvalidSessionError() def lookup_callsign(self, callsign: str) -> OrderedDict[str, Any]: """Looks up a callsign in the QRZ database. @@ -283,7 +370,7 @@ def lookup_callsign(self, callsign: str) -> OrderedDict[str, Any]: Raises: HTTPError: An error occurred trying to make a connection. - QRZInvalidSession: An error occurred trying to instantiate a session. + QRZInvalidSessionError: An error occurred trying to instantiate a session. Returns: OrderedDict[str, Any]: Data on the callsign looked up, organized as @@ -297,7 +384,7 @@ def lookup_callsign(self, callsign: str) -> OrderedDict[str, Any]: ) if response.status_code == requests.codes.ok: parsed_response = xmltodict.parse(response.text) - if not parsed_response.get("Key"): + if not parsed_response["QRZDatabase"]["Session"].get("Key"): self._initiate_session() num_retries += 1 else: @@ -305,7 +392,7 @@ def lookup_callsign(self, callsign: str) -> OrderedDict[str, Any]: else: raise response.raise_for_status() # if we didn't manage to return from a logged in session, raise an error - raise QRZInvalidSession( + raise QRZInvalidSessionError( **{"message": parsed_response["ERROR"]} if parsed_response.get("ERROR") else {} @@ -315,11 +402,12 @@ def lookup_dxcc(self, dxcc: str) -> OrderedDict[str, Any]: """Looks up a DXCC by prefix or DXCC number. Args: - dxcc (str): DXCC or prefix to lookup + dxcc (str): DXCC or prefix to lookup. Note that callsigns must be\ + uppercase, or QRZ won't recognize it. Raises: HTTPError: An error occurred trying to make a connection. - QRZInvalidSession: An error occurred trying to instantiate a session. + QRZInvalidSessionError: An error occurred trying to instantiate a session. Returns: OrderedDict[str, Any]: Data on the callsign looked up, organized as\ @@ -336,7 +424,7 @@ def lookup_dxcc(self, dxcc: str) -> OrderedDict[str, Any]: ) if response.status_code == requests.codes.ok: parsed_response = xmltodict.parse(response.text) - if not parsed_response.get("Key"): + if not parsed_response["QRZDatabase"]["Session"].get("Key"): self._initiate_session() num_retries += 1 else: @@ -344,7 +432,7 @@ def lookup_dxcc(self, dxcc: str) -> OrderedDict[str, Any]: else: raise response.raise_for_status() # if we didn't manage to return from a logged in session, raise an error - raise QRZInvalidSession( + raise QRZInvalidSessionError( **{"message": parsed_response["ERROR"]} if parsed_response.get("ERROR") else {} diff --git a/src/qspylib/requirements.txt b/src/qspylib/requirements.txt index a5c00b1..afb6935 100644 Binary files a/src/qspylib/requirements.txt and b/src/qspylib/requirements.txt differ diff --git a/src/qspylib/test_pytest.py b/src/qspylib/test_pytest.py index 1cdc721..1798fc7 100644 --- a/src/qspylib/test_pytest.py +++ b/src/qspylib/test_pytest.py @@ -10,7 +10,7 @@ import qspylib.logbook from qspylib import eqsl from qspylib import lotw -# from qspylib import qrz +from qspylib import qrz ################# @@ -207,6 +207,15 @@ def test_get_user_data(): # qrz tests # ############# -# def test_qrz_xml_with_invalid_key(): -# log_obj = qrz.QRZLogbookAPI('aaaaaaaaaaaaa') -# log = log_obj.fetch_logbook() + +def test_qrz_xml_with_invalid_login(): + """Test a bad login to the QRZ XML API.""" + with pytest.raises(qrz.QRZInvalidSessionError): + qrz.QRZXMLClient("badusername", "badpassword") + + +def test_qrz_logbook_with_invalid_key(): + """Test a bad login to the QRZ Logbook API.""" + with pytest.raises(qrz.QRZLogbookError): + qrz_logbook_obj = qrz.QRZLogbookClient("badkeythatisnotreal") + qrz_logbook_obj.fetch_logbook()