Skip to content

Commit

Permalink
Add task management using TaskIQ
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Jul 25, 2024
1 parent 3914060 commit 3352cc0
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,5 @@ ENV/
*.swo

pyrightconfig.json

logs/
222 changes: 202 additions & 20 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ astropy = "^6.0.0"
astroplan = "^0.9.1"
polars = "^1.0.0"
redis = {version = "^5.0.3", extras = ["hiredis"]}
httpx = ">=0.24.0"
aiocache = "^0.12.2"
httpx = "^0"
aiocache = "^0"
lvmgort = "^1.0.0b1"
cachetools = "^5.4.0"
taskiq = {extras = ["reload"], version = "^0"}
taskiq-redis = "^1"
taskiq-aio-pika = "^0"
taskiq-fastapi = "^0"

[tool.poetry.group.dev.dependencies]
ipython = ">=8.0.0"
Expand All @@ -58,6 +62,14 @@ env.SECRET_KEY = "33744caf930b8c695ec39221dd158e9c5fda13d0d19d1417ec71cf189aad65
args = [
{ name = "port", default = "8888" }
]
deps = ['taskiq']

[tool.poe.tasks.taskiq]
shell = "taskiq worker -r lvmapi.app:broker lvmapi.tasks --log-level $log_level &"
args = [
{ name = "log-level", default = "ERROR" }
]

[tool.ruff]
line-length = 88
target-version = 'py312'
Expand Down
13 changes: 13 additions & 0 deletions src/lvmapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

from __future__ import annotations

import taskiq_fastapi
from fastapi import FastAPI

from lvmapi import auth
from lvmapi.broker import broker, broker_shutdown, broker_startup
from lvmapi.routers import (
alerts,
enclosure,
Expand All @@ -19,12 +21,14 @@
overwatcher,
slack,
spectrographs,
tasks,
telescopes,
weather,
)


app = FastAPI()

app.include_router(auth.router)
app.include_router(telescopes.router)
app.include_router(spectrographs.router)
Expand All @@ -35,6 +39,15 @@
app.include_router(macros.router)
app.include_router(enclosure.router)
app.include_router(alerts.router)
app.include_router(tasks.router)


# Lifecycle events for the broker.
app.add_event_handler("startup", broker_startup)
app.add_event_handler("shutdown", broker_shutdown)

# Integration with FastAPI.
taskiq_fastapi.init(broker, "lvmapi.app:app")


@app.get("/")
Expand Down
34 changes: 34 additions & 0 deletions src/lvmapi/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-07-25
# @Filename: broker.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend


__all__ = ["broker"]


async def broker_startup():
"""Start broker on startup."""

if not broker.is_worker_process:
await broker.startup()


async def broker_shutdown():
"""Shut down broker."""

if not broker.is_worker_process:
await broker.shutdown()


# TaskIQ broker.
backend = RedisAsyncResultBackend("redis://localhost")
broker = AioPikaBroker().with_result_backend(backend)
Loading

0 comments on commit 3352cc0

Please sign in to comment.