From 3352cc0d40f2d36814fbd0d41533493b6a421d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Thu, 25 Jul 2024 18:08:48 +0000 Subject: [PATCH] Add task management using TaskIQ --- .gitignore | 2 + poetry.lock | 222 ++++++++++++++++++++++++++++++++---- pyproject.toml | 16 ++- src/lvmapi/app.py | 13 +++ src/lvmapi/broker.py | 34 ++++++ src/lvmapi/routers/tasks.py | 57 +++++++++ src/lvmapi/tasks.py | 45 ++++++++ 7 files changed, 367 insertions(+), 22 deletions(-) create mode 100644 src/lvmapi/broker.py create mode 100644 src/lvmapi/routers/tasks.py create mode 100644 src/lvmapi/tasks.py diff --git a/.gitignore b/.gitignore index 2692f95..d10834b 100644 --- a/.gitignore +++ b/.gitignore @@ -115,3 +115,5 @@ ENV/ *.swo pyrightconfig.json + +logs/ diff --git a/poetry.lock b/poetry.lock index d10a893..58abf46 100644 --- a/poetry.lock +++ b/poetry.lock @@ -931,6 +931,16 @@ files = [ {file = "frozenlist-1.4.1.tar.gz", hash = "sha256:c037a86e8513059a2613aaba4d817bb90b9d9b6b69aace3ce9c877e8c8ed402b"}, ] +[[package]] +name = "gitignore-parser" +version = "0.1.11" +description = "A spec-compliant gitignore parser for Python 3.5+" +optional = false +python-versions = "*" +files = [ + {file = "gitignore_parser-0.1.11.tar.gz", hash = "sha256:fa10fde48b44888eeefac096f53bcdad9b87a4ffd7db788558dbdf71ff3bc9db"}, +] + [[package]] name = "google-auth" version = "2.32.0" @@ -1193,6 +1203,25 @@ files = [ {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"}, ] +[[package]] +name = "importlib-metadata" +version = "8.2.0" +description = "Read metadata from Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "importlib_metadata-8.2.0-py3-none-any.whl", hash = "sha256:11901fa0c2f97919b288679932bb64febaeacf289d18ac84dd68cb2e74213369"}, + {file = "importlib_metadata-8.2.0.tar.gz", hash = "sha256:72e8d4399996132204f9a16dcc751af254a48f8d1b20b9ff0f98d4a8f901e73d"}, +] + +[package.dependencies] +zipp = ">=0.5" + +[package.extras] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +perf = ["ipython"] +test = ["flufl.flake8", "importlib-resources (>=1.3)", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy", "pytest-perf (>=0.9.2)", "pytest-ruff (>=0.2.1)"] + [[package]] name = "influxdb-client" version = "1.44.0" @@ -2051,6 +2080,16 @@ files = [ {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, ] +[[package]] +name = "pycron" +version = "3.0.0" +description = "Simple cron-like parser, which determines if current datetime matches conditions." +optional = false +python-versions = ">=3.5" +files = [ + {file = "pycron-3.0.0.tar.gz", hash = "sha256:b916044e3e8253d5409c68df3ac64a3472c4e608dab92f40e8f595e5d3acb3de"}, +] + [[package]] name = "pydantic" version = "2.8.2" @@ -2606,29 +2645,29 @@ pyasn1 = ">=0.1.3" [[package]] name = "ruff" -version = "0.5.4" +version = "0.5.5" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.5.4-py3-none-linux_armv6l.whl", hash = "sha256:82acef724fc639699b4d3177ed5cc14c2a5aacd92edd578a9e846d5b5ec18ddf"}, - {file = "ruff-0.5.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:da62e87637c8838b325e65beee485f71eb36202ce8e3cdbc24b9fcb8b99a37be"}, - {file = "ruff-0.5.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:e98ad088edfe2f3b85a925ee96da652028f093d6b9b56b76fc242d8abb8e2059"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4c55efbecc3152d614cfe6c2247a3054cfe358cefbf794f8c79c8575456efe19"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f9b85eaa1f653abd0a70603b8b7008d9e00c9fa1bbd0bf40dad3f0c0bdd06793"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0cf497a47751be8c883059c4613ba2f50dd06ec672692de2811f039432875278"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:09c14ed6a72af9ccc8d2e313d7acf7037f0faff43cde4b507e66f14e812e37f7"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:628f6b8f97b8bad2490240aa84f3e68f390e13fabc9af5c0d3b96b485921cd60"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3520a00c0563d7a7a7c324ad7e2cde2355733dafa9592c671fb2e9e3cd8194c1"}, - {file = "ruff-0.5.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:93789f14ca2244fb91ed481456f6d0bb8af1f75a330e133b67d08f06ad85b516"}, - {file = "ruff-0.5.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:029454e2824eafa25b9df46882f7f7844d36fd8ce51c1b7f6d97e2615a57bbcc"}, - {file = "ruff-0.5.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:9492320eed573a13a0bc09a2957f17aa733fff9ce5bf00e66e6d4a88ec33813f"}, - {file = "ruff-0.5.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:a6e1f62a92c645e2919b65c02e79d1f61e78a58eddaebca6c23659e7c7cb4ac7"}, - {file = "ruff-0.5.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:768fa9208df2bec4b2ce61dbc7c2ddd6b1be9fb48f1f8d3b78b3332c7d71c1ff"}, - {file = "ruff-0.5.4-py3-none-win32.whl", hash = "sha256:e1e7393e9c56128e870b233c82ceb42164966f25b30f68acbb24ed69ce9c3a4e"}, - {file = "ruff-0.5.4-py3-none-win_amd64.whl", hash = "sha256:58b54459221fd3f661a7329f177f091eb35cf7a603f01d9eb3eb11cc348d38c4"}, - {file = "ruff-0.5.4-py3-none-win_arm64.whl", hash = "sha256:bd53da65f1085fb5b307c38fd3c0829e76acf7b2a912d8d79cadcdb4875c1eb7"}, - {file = "ruff-0.5.4.tar.gz", hash = "sha256:2795726d5f71c4f4e70653273d1c23a8182f07dd8e48c12de5d867bfb7557eed"}, + {file = "ruff-0.5.5-py3-none-linux_armv6l.whl", hash = "sha256:605d589ec35d1da9213a9d4d7e7a9c761d90bba78fc8790d1c5e65026c1b9eaf"}, + {file = "ruff-0.5.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:00817603822a3e42b80f7c3298c8269e09f889ee94640cd1fc7f9329788d7bf8"}, + {file = "ruff-0.5.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:187a60f555e9f865a2ff2c6984b9afeffa7158ba6e1eab56cb830404c942b0f3"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fe26fc46fa8c6e0ae3f47ddccfbb136253c831c3289bba044befe68f467bfb16"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:4ad25dd9c5faac95c8e9efb13e15803cd8bbf7f4600645a60ffe17c73f60779b"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f70737c157d7edf749bcb952d13854e8f745cec695a01bdc6e29c29c288fc36e"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:cfd7de17cef6ab559e9f5ab859f0d3296393bc78f69030967ca4d87a541b97a0"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a09b43e02f76ac0145f86a08e045e2ea452066f7ba064fd6b0cdccb486f7c3e7"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d0b856cb19c60cd40198be5d8d4b556228e3dcd545b4f423d1ad812bfdca5884"}, + {file = "ruff-0.5.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3687d002f911e8a5faf977e619a034d159a8373514a587249cc00f211c67a091"}, + {file = "ruff-0.5.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ac9dc814e510436e30d0ba535f435a7f3dc97f895f844f5b3f347ec8c228a523"}, + {file = "ruff-0.5.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:af9bdf6c389b5add40d89b201425b531e0a5cceb3cfdcc69f04d3d531c6be74f"}, + {file = "ruff-0.5.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:d40a8533ed545390ef8315b8e25c4bb85739b90bd0f3fe1280a29ae364cc55d8"}, + {file = "ruff-0.5.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:cab904683bf9e2ecbbe9ff235bfe056f0eba754d0168ad5407832928d579e7ab"}, + {file = "ruff-0.5.5-py3-none-win32.whl", hash = "sha256:696f18463b47a94575db635ebb4c178188645636f05e934fdf361b74edf1bb2d"}, + {file = "ruff-0.5.5-py3-none-win_amd64.whl", hash = "sha256:50f36d77f52d4c9c2f1361ccbfbd09099a1b2ea5d2b2222c586ab08885cf3445"}, + {file = "ruff-0.5.5-py3-none-win_arm64.whl", hash = "sha256:3191317d967af701f1b73a31ed5788795936e423b7acce82a2b63e26eb3e89d6"}, + {file = "ruff-0.5.5.tar.gz", hash = "sha256:cc5516bdb4858d972fbc31d246bdb390eab8df1a26e2353be2dbc0c2d7f5421a"}, ] [[package]] @@ -2791,6 +2830,94 @@ files = [ [package.dependencies] pbr = ">=2.0.0,<2.1.0 || >2.1.0" +[[package]] +name = "taskiq" +version = "0.11.6" +description = "Distributed task queue with full async support" +optional = false +python-versions = "<4.0.0,>=3.8.1" +files = [ + {file = "taskiq-0.11.6-py3-none-any.whl", hash = "sha256:6a994aa1d44cd2bc683e8e5b8376b9ac12d2288a0e2825efe9b977d27f3023b0"}, + {file = "taskiq-0.11.6.tar.gz", hash = "sha256:97826018bd6ccd46ca7e41549d22d054f7fe9c069c15803586b027407adb6ec4"}, +] + +[package.dependencies] +anyio = ">=3" +gitignore-parser = {version = ">=0,<1", optional = true, markers = "extra == \"reload\""} +importlib-metadata = "*" +packaging = ">=19" +pycron = ">=3.0.0,<4.0.0" +pydantic = ">=1.0,<=3.0" +pytz = "*" +taskiq_dependencies = ">=1.3.1,<2" +typing-extensions = ">=3.10.0.0" +watchdog = {version = ">=2.1.9,<3.0.0", optional = true, markers = "extra == \"reload\""} + +[package.extras] +cbor = ["cbor2 (>=5.4.6,<6.0.0)"] +metrics = ["prometheus_client (>=0,<1)"] +msgpack = ["msgpack (>=1.0.7,<2.0.0)"] +orjson = ["orjson (>=3.9.9,<4.0.0)"] +reload = ["gitignore-parser (>=0,<1)", "watchdog (>=2.1.9,<3.0.0)"] +uv = ["uvloop (>=0.16.0,<1)"] +zmq = ["pyzmq (>=23.2.0,<24.0.0)"] + +[[package]] +name = "taskiq-aio-pika" +version = "0.4.0" +description = "RabbitMQ broker for taskiq" +optional = false +python-versions = ">=3.8.1,<4.0.0" +files = [ + {file = "taskiq_aio_pika-0.4.0-py3-none-any.whl", hash = "sha256:3d96acf8ee5d9170bdc3f726b7a2f9d29f58e0fbc760fb13a50f40eeaa1368a3"}, + {file = "taskiq_aio_pika-0.4.0.tar.gz", hash = "sha256:9295e911ad2c808e10571adee262dcfe51344a2aebba0fbc89249e666bbe44a1"}, +] + +[package.dependencies] +aio-pika = ">=9.0,<10.0" +taskiq = ">=0.6.0,<1" + +[[package]] +name = "taskiq-dependencies" +version = "1.5.3" +description = "FastAPI like dependency injection implementation" +optional = false +python-versions = "<4.0.0,>=3.8.1" +files = [ + {file = "taskiq_dependencies-1.5.3-py3-none-any.whl", hash = "sha256:ffa81997b8d6f4be0b2e08280b241dc3cecd9b49ff1fd259688e244d977cadd4"}, + {file = "taskiq_dependencies-1.5.3.tar.gz", hash = "sha256:ee52d01e6683cafbeb0a0b0e4abf58b2d304d7db0769e024fe0a0e8bcedadc57"}, +] + +[[package]] +name = "taskiq-fastapi" +version = "0.3.1" +description = "FastAPI integration for taskiq" +optional = false +python-versions = ">=3.8.1,<4.0.0" +files = [ + {file = "taskiq_fastapi-0.3.1-py3-none-any.whl", hash = "sha256:1a1305824726b153e418312f332f16f915bd3f1ee4e10f6708cd9891ed06148c"}, + {file = "taskiq_fastapi-0.3.1.tar.gz", hash = "sha256:08412205102b2cadf8b664a811dca0ffa44c9376cf6cef85162dac1dc46c8d01"}, +] + +[package.dependencies] +fastapi = ">=0.93.0" +taskiq = ">=0.8.0,<1" + +[[package]] +name = "taskiq-redis" +version = "1.0.0" +description = "Redis integration for taskiq" +optional = false +python-versions = "<4.0.0,>=3.8.1" +files = [ + {file = "taskiq_redis-1.0.0-py3-none-any.whl", hash = "sha256:867949594e63402bdb8378fcc9f1e4e1a18360cb7d3da30fe06d8c33cb74822f"}, + {file = "taskiq_redis-1.0.0.tar.gz", hash = "sha256:45b2ac312a61725b4a5bdc5595f160d83986eac21eb5bc080ede5e6272d87d61"}, +] + +[package.dependencies] +redis = ">=5,<6" +taskiq = ">=0.11.1,<1" + [[package]] name = "tomli" version = "2.0.1" @@ -2946,6 +3073,46 @@ files = [ docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] test = ["Cython (>=0.29.36,<0.30.0)", "aiohttp (==3.9.0b0)", "aiohttp (>=3.8.1)", "flake8 (>=5.0,<6.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=23.0.0,<23.1.0)", "pycodestyle (>=2.9.0,<2.10.0)"] +[[package]] +name = "watchdog" +version = "2.3.1" +description = "Filesystem events monitoring" +optional = false +python-versions = ">=3.6" +files = [ + {file = "watchdog-2.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d1f1200d4ec53b88bf04ab636f9133cb703eb19768a39351cee649de21a33697"}, + {file = "watchdog-2.3.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:564e7739abd4bd348aeafbf71cc006b6c0ccda3160c7053c4a53b67d14091d42"}, + {file = "watchdog-2.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:95ad708a9454050a46f741ba5e2f3468655ea22da1114e4c40b8cbdaca572565"}, + {file = "watchdog-2.3.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:a073c91a6ef0dda488087669586768195c3080c66866144880f03445ca23ef16"}, + {file = "watchdog-2.3.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aa8b028750b43e80eea9946d01925168eeadb488dfdef1d82be4b1e28067f375"}, + {file = "watchdog-2.3.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:964fd236cd443933268ae49b59706569c8b741073dbfd7ca705492bae9d39aab"}, + {file = "watchdog-2.3.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:91fd146d723392b3e6eb1ac21f122fcce149a194a2ba0a82c5e4d0ee29cd954c"}, + {file = "watchdog-2.3.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:efe3252137392a471a2174d721e1037a0e6a5da7beb72a021e662b7000a9903f"}, + {file = "watchdog-2.3.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:85bf2263290591b7c5fa01140601b64c831be88084de41efbcba6ea289874f44"}, + {file = "watchdog-2.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8f2df370cd8e4e18499dd0bfdef476431bcc396108b97195d9448d90924e3131"}, + {file = "watchdog-2.3.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:ea5d86d1bcf4a9d24610aa2f6f25492f441960cf04aed2bd9a97db439b643a7b"}, + {file = "watchdog-2.3.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:6f5d0f7eac86807275eba40b577c671b306f6f335ba63a5c5a348da151aba0fc"}, + {file = "watchdog-2.3.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5b848c71ef2b15d0ef02f69da8cc120d335cec0ed82a3fa7779e27a5a8527225"}, + {file = "watchdog-2.3.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:0d9878be36d2b9271e3abaa6f4f051b363ff54dbbe7e7df1af3c920e4311ee43"}, + {file = "watchdog-2.3.1-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4cd61f98cb37143206818cb1786d2438626aa78d682a8f2ecee239055a9771d5"}, + {file = "watchdog-2.3.1-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:3d2dbcf1acd96e7a9c9aefed201c47c8e311075105d94ce5e899f118155709fd"}, + {file = "watchdog-2.3.1-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:03f342a9432fe08107defbe8e405a2cb922c5d00c4c6c168c68b633c64ce6190"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_aarch64.whl", hash = "sha256:7a596f9415a378d0339681efc08d2249e48975daae391d58f2e22a3673b977cf"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_armv7l.whl", hash = "sha256:0e1dd6d449267cc7d6935d7fe27ee0426af6ee16578eed93bacb1be9ff824d2d"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_i686.whl", hash = "sha256:7a1876f660e32027a1a46f8a0fa5747ad4fcf86cb451860eae61a26e102c8c79"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_ppc64.whl", hash = "sha256:2caf77ae137935c1466f8cefd4a3aec7017b6969f425d086e6a528241cba7256"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:53f3e95081280898d9e4fc51c5c69017715929e4eea1ab45801d5e903dd518ad"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_s390x.whl", hash = "sha256:9da7acb9af7e4a272089bd2af0171d23e0d6271385c51d4d9bde91fe918c53ed"}, + {file = "watchdog-2.3.1-py3-none-manylinux2014_x86_64.whl", hash = "sha256:8a4d484e846dcd75e96b96d80d80445302621be40e293bfdf34a631cab3b33dc"}, + {file = "watchdog-2.3.1-py3-none-win32.whl", hash = "sha256:a74155398434937ac2780fd257c045954de5b11b5c52fc844e2199ce3eecf4cf"}, + {file = "watchdog-2.3.1-py3-none-win_amd64.whl", hash = "sha256:5defe4f0918a2a1a4afbe4dbb967f743ac3a93d546ea4674567806375b024adb"}, + {file = "watchdog-2.3.1-py3-none-win_ia64.whl", hash = "sha256:4109cccf214b7e3462e8403ab1e5b17b302ecce6c103eb2fc3afa534a7f27b96"}, + {file = "watchdog-2.3.1.tar.gz", hash = "sha256:d9f9ed26ed22a9d331820a8432c3680707ea8b54121ddcc9dc7d9f2ceeb36906"}, +] + +[package.extras] +watchmedo = ["PyYAML (>=3.10)"] + [[package]] name = "watchfiles" version = "0.22.0" @@ -3244,7 +3411,22 @@ files = [ idna = ">=2.0" multidict = ">=4.0" +[[package]] +name = "zipp" +version = "3.19.2" +description = "Backport of pathlib-compatible object wrapper for zip files" +optional = false +python-versions = ">=3.8" +files = [ + {file = "zipp-3.19.2-py3-none-any.whl", hash = "sha256:f091755f667055f2d02b32c53771a7a6c8b47e1fdbc4b72a8b9072b3eef8015c"}, + {file = "zipp-3.19.2.tar.gz", hash = "sha256:bf1dcf6450f873a13e952a29504887c89e6de7506209e5b1bcc3460135d4de19"}, +] + +[package.extras] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] + [metadata] lock-version = "2.0" python-versions = "^3.11,<4" -content-hash = "58e1129496f1bb8632dd7638b55d8dca17b5ebfbcdb485053f1cc05ef1d135fe" +content-hash = "872a2766c5e644560eef6a3d2837564826f08a59b0707b70d98909c81757b85f" diff --git a/pyproject.toml b/pyproject.toml index c44f0e6..7058195 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,10 +39,14 @@ astropy = "^6.0.0" astroplan = "^0.9.1" polars = "^1.0.0" redis = {version = "^5.0.3", extras = ["hiredis"]} -httpx = ">=0.24.0" -aiocache = "^0.12.2" +httpx = "^0" +aiocache = "^0" lvmgort = "^1.0.0b1" cachetools = "^5.4.0" +taskiq = {extras = ["reload"], version = "^0"} +taskiq-redis = "^1" +taskiq-aio-pika = "^0" +taskiq-fastapi = "^0" [tool.poetry.group.dev.dependencies] ipython = ">=8.0.0" @@ -58,6 +62,14 @@ env.SECRET_KEY = "33744caf930b8c695ec39221dd158e9c5fda13d0d19d1417ec71cf189aad65 args = [ { name = "port", default = "8888" } ] +deps = ['taskiq'] + +[tool.poe.tasks.taskiq] +shell = "taskiq worker -r lvmapi.app:broker lvmapi.tasks --log-level $log_level &" +args = [ + { name = "log-level", default = "ERROR" } +] + [tool.ruff] line-length = 88 target-version = 'py312' diff --git a/src/lvmapi/app.py b/src/lvmapi/app.py index f332937..8537777 100644 --- a/src/lvmapi/app.py +++ b/src/lvmapi/app.py @@ -8,9 +8,11 @@ from __future__ import annotations +import taskiq_fastapi from fastapi import FastAPI from lvmapi import auth +from lvmapi.broker import broker, broker_shutdown, broker_startup from lvmapi.routers import ( alerts, enclosure, @@ -19,12 +21,14 @@ overwatcher, slack, spectrographs, + tasks, telescopes, weather, ) app = FastAPI() + app.include_router(auth.router) app.include_router(telescopes.router) app.include_router(spectrographs.router) @@ -35,6 +39,15 @@ app.include_router(macros.router) app.include_router(enclosure.router) app.include_router(alerts.router) +app.include_router(tasks.router) + + +# Lifecycle events for the broker. +app.add_event_handler("startup", broker_startup) +app.add_event_handler("shutdown", broker_shutdown) + +# Integration with FastAPI. +taskiq_fastapi.init(broker, "lvmapi.app:app") @app.get("/") diff --git a/src/lvmapi/broker.py b/src/lvmapi/broker.py new file mode 100644 index 0000000..386cd51 --- /dev/null +++ b/src/lvmapi/broker.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-07-25 +# @Filename: broker.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +from taskiq_aio_pika import AioPikaBroker +from taskiq_redis import RedisAsyncResultBackend + + +__all__ = ["broker"] + + +async def broker_startup(): + """Start broker on startup.""" + + if not broker.is_worker_process: + await broker.startup() + + +async def broker_shutdown(): + """Shut down broker.""" + + if not broker.is_worker_process: + await broker.shutdown() + + +# TaskIQ broker. +backend = RedisAsyncResultBackend("redis://localhost") +broker = AioPikaBroker().with_result_backend(backend) diff --git a/src/lvmapi/routers/tasks.py b/src/lvmapi/routers/tasks.py new file mode 100644 index 0000000..53f6974 --- /dev/null +++ b/src/lvmapi/routers/tasks.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-07-25 +# @Filename: tasks.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter +from pydantic import BaseModel +from taskiq_redis.exceptions import ResultIsMissingError + +from lvmapi.broker import broker + + +class TaskResult(BaseModel): + """A model to represent the result of a task.""" + + is_err: bool + is_ready: bool + log: str | None = None + return_value: Any | None = None + execution_time: float | None = None + labels: dict | None = None + error: str | None = None + + +router = APIRouter(prefix="/tasks", tags=["tasks"]) + + +@router.get("/{task_id}/ready") +async def task_ready(task_id: str) -> bool: + """Returns whether a tasks has finished running.""" + + return await broker.result_backend.is_result_ready(task_id) + + +@router.get("/{task_id}/result") +async def task_result(task_id: str): + """Returns whether a tasks has finished running.""" + + try: + result = await broker.result_backend.get_result(task_id) + except ResultIsMissingError: + return TaskResult( + is_ready=False, + is_err=True, + error="Task not found or result not ready.", + ) + + error = result.error if result.error is None else str(result.error) + + return TaskResult(is_ready=True, **result.dict(exclude={"error"}), error=error) diff --git a/src/lvmapi/tasks.py b/src/lvmapi/tasks.py new file mode 100644 index 0000000..88c7456 --- /dev/null +++ b/src/lvmapi/tasks.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-07-24 +# @Filename: tasks.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +from typing import Literal + +from lvmapi.app import broker +from lvmapi.tools.gort import get_gort_client + + +@broker.task() +async def move_dome_task(direction: Literal["open", "close"], force: bool = False): + """Opens/closes the dome. + + Uses GORT to ensure that the telescopes are parked before moving the dome. + + """ + + from lvmapi.app import app + + async with get_gort_client(app) as gort: + if direction == "open": + await gort.enclosure.open() + else: + await gort.enclosure.close(force=force) + + return True + + +@broker.task() +async def shutdown_task(): + """Shuts down the system.""" + + from lvmapi.app import app + + async with get_gort_client(app) as gort: + await gort.shutdown(park_telescopes=True) + + return True