Skip to content

Commit

Permalink
feat: add multiple triton backend (#1314)
Browse files Browse the repository at this point in the history
* fix: improve worker task dispatch

before, we had 2 high priority workers (that listened to queue high-1
and high-2 respectively), and 2 workers that listened both to one
high priority queue (high-3 or high-4) and to low priority queue.

It's beneficial that all workers listen to low priority queue (but with
a lower priority), so that low priority jobs get completed faster when
there is no high priority tasks.

* refactor: allow to specify Triton URI in CLI

* fix: fix CLI command run-object-detection

* feat: add a new CLI command to run category detection on the full DB

* fix: remove --model-name parameter from categorize CLI command

* fix: fix mypy error in unit test

* fix: fix issue and warning with tests
  • Loading branch information
raphael0202 authored Feb 13, 2024
1 parent cf12304 commit a7eab33
Show file tree
Hide file tree
Showing 17 changed files with 278 additions and 83 deletions.
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ livecheck:

log:
@echo "🥫 Reading logs (docker-compose) …"
${DOCKER_COMPOSE} logs -f api scheduler worker_high_1 worker_high_2 worker_low_1 worker_low_2
${DOCKER_COMPOSE} logs -f api scheduler worker_1 worker_2 worker_3 worker_4

#------------#
# Management #
Expand Down Expand Up @@ -152,10 +152,10 @@ init-elasticsearch:

launch-burst-worker:
ifdef queues
${DOCKER_COMPOSE} run --rm -d --no-deps worker_high_1 python -m robotoff run-worker ${queues} --burst
${DOCKER_COMPOSE} run --rm -d --no-deps worker_1 python -m robotoff run-worker ${queues} --burst
# Only launch burst worker on low priority queue if queue is not specified
else
${DOCKER_COMPOSE} run --rm -d --no-deps worker_high_1 python -m robotoff run-worker robotoff-low --burst
${DOCKER_COMPOSE} run --rm -d --no-deps worker_1 python -m robotoff run-worker robotoff-low --burst
endif

#------------#
Expand Down Expand Up @@ -204,26 +204,26 @@ health:
i18n-compile:
@echo "🥫 Compiling translations …"
# Note it's important to have --no-deps, to avoid launching a concurrent postgres instance
${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_high_1 -c "cd i18n && . compile.sh"
${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_1 -c "cd i18n && . compile.sh"

unit-tests:
@echo "🥫 Running tests …"
# run tests in worker to have more memory
# also, change project name to run in isolation
${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest --cov-report xml --cov=robotoff tests/unit
${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest --cov-report xml --cov=robotoff tests/unit

integration-tests:
@echo "🥫 Running integration tests …"
# run tests in worker to have more memory
# also, change project name to run in isolation
${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration
${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration
( ${DOCKER_COMPOSE_TEST} down -v || true )

# interactive testings
# usage: make pytest args='test/unit/my-test.py --pdb'
pytest: guard-args
@echo "🥫 Running test: ${args}"
${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest ${args}
${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest ${args}

#------------#
# Production #
Expand Down
25 changes: 13 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,27 @@ services:
- postgres
- redis

worker_high_1:
worker_1:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_high_1
command: python -m robotoff run-worker robotoff-high-1
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_1
# Each worker listens to a single high priority queue and to the low priority queue.
# As the low priority queue comes last, it will only be processed if there are no high
# priority tasks.
command: python -m robotoff run-worker robotoff-high-1 robotoff-low

worker_high_2:
worker_2:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_high_2
command: python -m robotoff run-worker robotoff-high-2
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_2
command: python -m robotoff run-worker robotoff-high-2 robotoff-low

worker_low_1:
worker_3:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_low_1
# Each worker (whether it's a high or low priority worker) listens to a
# single high priority queue
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_3
command: python -m robotoff run-worker robotoff-high-3 robotoff-low

worker_low_2:
worker_4:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_low_2
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_4
command: python -m robotoff run-worker robotoff-high-4 robotoff-low

scheduler:
Expand Down
8 changes: 4 additions & 4 deletions docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ x-robotoff-dev: &robotoff-dev
services:
api:
<<: *robotoff-dev
worker_high_1:
worker_1:
<<: *robotoff-dev
worker_high_2:
worker_2:
<<: *robotoff-dev
worker_low_1:
worker_3:
<<: *robotoff-dev
worker_low_2:
worker_4:
<<: *robotoff-dev

scheduler:
Expand Down
89 changes: 80 additions & 9 deletions robotoff/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typer

from robotoff.types import (
NeuralCategoryClassifierModel,
ObjectDetectionModel,
PredictionType,
ProductIdentifier,
Expand Down Expand Up @@ -132,11 +131,11 @@ def categorize(
ServerType.off, help="Server type of the product"
),
deepest_only: bool = False,
model_name: NeuralCategoryClassifierModel = typer.Option(
NeuralCategoryClassifierModel.keras_image_embeddings_3_0,
help="name of the model to use",
),
threshold: Optional[float] = typer.Option(0.5, help="detection threshold to use"),
triton_uri: Optional[str] = typer.Option(
None,
help="URI of the Triton server to use. If not provided, the default value from settings is used.",
),
) -> None:
"""Predict product categories based on the neural category classifier.
Expand Down Expand Up @@ -164,7 +163,11 @@ def categorize(
predictions, _ = CategoryClassifier(
get_taxonomy(TaxonomyType.category.name, offline=True)
).predict(
product, product_id, deepest_only, threshold=threshold, model_name=model_name
product,
product_id,
deepest_only,
threshold=threshold,
triton_uri=triton_uri,
)

if predictions:
Expand Down Expand Up @@ -357,6 +360,70 @@ def import_images_in_db(
)


@app.command()
def run_category_prediction(
triton_uri: Optional[str] = typer.Option(
None,
help="URI of the Triton Inference Server to use. If not provided, the default value from settings is used.",
),
limit: Optional[int] = typer.Option(
None, help="Maximum numbers of job to launch (default: all)"
),
):
"""Launch category prediction jobs on all products without categories in
DB."""
import tqdm
from openfoodfacts.dataset import ProductDataset

from robotoff.models import Prediction, db
from robotoff.settings import DATASET_DIR
from robotoff.utils import get_logger
from robotoff.workers.queues import enqueue_job, low_queue
from robotoff.workers.tasks.product_updated import add_category_insight_job

logger = get_logger()
# Download the latest dump of the dataset, cache it in DATASET_DIR
ds = ProductDataset(force_download=True, download_newer=True, cache_dir=DATASET_DIR)

# The category detector only works for food products
server_type = ServerType.off

logger.info("Fetching products without categories in DB...")
with db:
barcode_with_categories = set(
barcode
for (barcode,) in Prediction.select(Prediction.barcode)
.distinct()
.where(
Prediction.server_type == server_type.name,
Prediction.type == PredictionType.category.name,
)
.tuples()
.limit(limit)
)
logger.info(
"%d products with categories already in DB", len(barcode_with_categories)
)
seen: set[str] = set()
added = 0
for product in tqdm.tqdm(ds, desc="products"):
barcode = product.get("code")
if not barcode or barcode in seen or barcode in barcode_with_categories:
continue
seen.add(barcode)
# Enqueue a job to predict category for this product
enqueue_job(
add_category_insight_job,
low_queue,
job_kwargs={"result_ttl": 0},
product_id=ProductIdentifier(barcode, server_type),
triton_uri=triton_uri,
)
added += 1

logger.info("%d jobs added", added)


@app.command()
def run_object_detection_model(
server_type: ServerType = typer.Option(
Expand All @@ -375,6 +442,10 @@ def run_object_detection_model(
"for the specified model.",
),
limit: Optional[int] = typer.Option(None, help="Maximum numbers of job to launch"),
triton_uri: Optional[str] = typer.Option(
None,
help="URI of the Triton Inference Server to use. If not provided, the default value from settings is used.",
),
):
"""Launch object detection model jobs on all missing images (images
without an ImagePrediction item for this model) in DB."""
Expand Down Expand Up @@ -415,7 +486,7 @@ def run_object_detection_model(
else:
with db:
query = (
ImageModel.select(ImageModel.barcode, ImageModel.id)
ImageModel.select(ImageModel.barcode, ImageModel.image_id)
.join(
ImagePrediction,
JOIN.LEFT_OUTER,
Expand All @@ -425,8 +496,7 @@ def run_object_detection_model(
),
)
.where(
ImageModel.server_type
== server_type.name
(ImageModel.server_type == server_type.name)
& ImagePrediction.model_name.is_null()
& (ImageModel.deleted == False), # noqa: E712
)
Expand All @@ -451,6 +521,7 @@ def run_object_detection_model(
job_kwargs={"result_ttl": 0},
product_id=ProductIdentifier(barcode, server_type),
image_url=image_url,
triton_uri=triton_uri,
)


Expand Down
8 changes: 6 additions & 2 deletions robotoff/insights/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def run_object_detection_model(
image_model: ImageModel,
threshold: float = 0.1,
return_null_if_exist: bool = True,
triton_uri: str | None = None,
) -> Optional[ImagePrediction]:
"""Run a model detection model and save the results in the
`image_prediction` table.
Expand All @@ -67,7 +68,10 @@ def run_object_detection_model(
`image` table)
:param threshold: the minimum object score above which we keep the object
data
:param return_null_if_exist: if True, return None if the image prediction
already exists in DB
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
:return: return None if the image does not exist in DB, or the created
`ImagePrediction` otherwise
"""
Expand All @@ -82,7 +86,7 @@ def run_object_detection_model(

timestamp = datetime.datetime.utcnow()
results = ObjectDetectionModelRegistry.get(model_name.value).detect_from_image(
image, output_image=False
image, output_image=False, triton_uri=triton_uri
)
data = results.to_json(threshold=threshold)
max_confidence = max((item["score"] for item in data), default=None)
Expand Down
10 changes: 6 additions & 4 deletions robotoff/prediction/category/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Optional

from robotoff.taxonomy import TaxonomyType, get_taxonomy
from robotoff.types import JSONType, NeuralCategoryClassifierModel, ProductIdentifier

Expand All @@ -10,9 +8,10 @@ def predict_category(
product: dict,
product_id: ProductIdentifier,
deepest_only: bool,
threshold: Optional[float] = None,
neural_model_name: Optional[NeuralCategoryClassifierModel] = None,
threshold: float | None = None,
neural_model_name: NeuralCategoryClassifierModel | None = None,
clear_cache: bool = False,
triton_uri: str | None = None,
) -> JSONType:
"""Predict categories for a product using neural model.
Expand All @@ -30,6 +29,8 @@ def predict_category(
prediction
:param clear_cache: if True, clear ingredient processing cache of neural
model before returning results
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
"""
taxonomy = get_taxonomy(TaxonomyType.category.name)
predictions, debug = CategoryClassifier(taxonomy).predict(
Expand All @@ -39,6 +40,7 @@ def predict_category(
threshold,
neural_model_name,
clear_cache=clear_cache,
triton_uri=triton_uri,
)
return {
"neural": {
Expand Down
11 changes: 7 additions & 4 deletions robotoff/prediction/category/neural/category_classifier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any

import numpy as np

Expand Down Expand Up @@ -66,9 +66,10 @@ def predict(
product: dict,
product_id: ProductIdentifier,
deepest_only: bool = False,
threshold: Optional[float] = None,
model_name: Optional[NeuralCategoryClassifierModel] = None,
threshold: float | None = None,
model_name: NeuralCategoryClassifierModel | None = None,
clear_cache: bool = False,
triton_uri: str | None = None,
) -> tuple[list[Prediction], JSONType]:
"""Return an unordered list of category predictions for the given
product and additional debug information.
Expand Down Expand Up @@ -122,6 +123,8 @@ def predict(
default.
:param clear_cache: if True, clear ingredient processing cache before
returning results
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
"""
logger.debug("predicting category with model %s", model_name)

Expand All @@ -142,7 +145,7 @@ def predict(
)

# Only generate image embeddings if it's required by the model
triton_stub = get_triton_inference_stub()
triton_stub = get_triton_inference_stub(triton_uri)

# We check whether image embeddings were provided as input
if "image_embeddings" in product:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from typing import Literal, Optional

import numpy as np
Expand Down Expand Up @@ -168,9 +169,15 @@ def generate_image_embeddings(
)

if non_null_image_by_ids:
start_time = time.monotonic()
computed_embeddings_by_id = _generate_image_embeddings(
non_null_image_by_ids, stub
)
logger.debug(
"Computed %d embeddings in %.2f seconds",
len(computed_embeddings_by_id),
time.monotonic() - start_time,
)
# Make sure all image IDs are in image table
refresh_images_in_db(product_id, product.get("images", {}))
# Save embeddings in embeddings.image_embeddings table for
Expand Down Expand Up @@ -267,7 +274,9 @@ def predict(

inputs = generate_inputs_dict(product, ocr_texts, image_embeddings)
debug = generate_debug_dict(model_name, threshold, inputs)
start_time = time.monotonic()
scores, labels = _predict(inputs, model_name, stub)
logger.debug("Predicted categories in %.2f seconds", time.monotonic() - start_time)
indices = np.argsort(-scores)

category_predictions: list[tuple[str, float, Optional[NeighborPredictionType]]] = []
Expand Down
Loading

0 comments on commit a7eab33

Please sign in to comment.