diff --git a/Makefile b/Makefile index c67e87874c..1f42f81742 100644 --- a/Makefile +++ b/Makefile @@ -93,7 +93,7 @@ livecheck: log: @echo "🥫 Reading logs (docker-compose) …" - ${DOCKER_COMPOSE} logs -f api scheduler worker_high_1 worker_high_2 worker_low_1 worker_low_2 + ${DOCKER_COMPOSE} logs -f api scheduler worker_1 worker_2 worker_3 worker_4 #------------# # Management # @@ -152,10 +152,10 @@ init-elasticsearch: launch-burst-worker: ifdef queues - ${DOCKER_COMPOSE} run --rm -d --no-deps worker_high_1 python -m robotoff run-worker ${queues} --burst + ${DOCKER_COMPOSE} run --rm -d --no-deps worker_1 python -m robotoff run-worker ${queues} --burst # Only launch burst worker on low priority queue if queue is not specified else - ${DOCKER_COMPOSE} run --rm -d --no-deps worker_high_1 python -m robotoff run-worker robotoff-low --burst + ${DOCKER_COMPOSE} run --rm -d --no-deps worker_1 python -m robotoff run-worker robotoff-low --burst endif #------------# @@ -204,26 +204,26 @@ health: i18n-compile: @echo "🥫 Compiling translations …" # Note it's important to have --no-deps, to avoid launching a concurrent postgres instance - ${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_high_1 -c "cd i18n && . compile.sh" + ${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_1 -c "cd i18n && . compile.sh" unit-tests: @echo "🥫 Running tests …" # run tests in worker to have more memory # also, change project name to run in isolation - ${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest --cov-report xml --cov=robotoff tests/unit + ${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest --cov-report xml --cov=robotoff tests/unit integration-tests: @echo "🥫 Running integration tests …" # run tests in worker to have more memory # also, change project name to run in isolation - ${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration + ${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration ( ${DOCKER_COMPOSE_TEST} down -v || true ) # interactive testings # usage: make pytest args='test/unit/my-test.py --pdb' pytest: guard-args @echo "🥫 Running test: ${args} …" - ${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest ${args} + ${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest ${args} #------------# # Production # diff --git a/docker-compose.yml b/docker-compose.yml index 20ebf9a997..f97d098905 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -70,26 +70,27 @@ services: - postgres - redis - worker_high_1: + worker_1: <<: *robotoff-worker - container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_high_1 - command: python -m robotoff run-worker robotoff-high-1 + container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_1 + # Each worker listens to a single high priority queue and to the low priority queue. + # As the low priority queue comes last, it will only be processed if there are no high + # priority tasks. + command: python -m robotoff run-worker robotoff-high-1 robotoff-low - worker_high_2: + worker_2: <<: *robotoff-worker - container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_high_2 - command: python -m robotoff run-worker robotoff-high-2 + container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_2 + command: python -m robotoff run-worker robotoff-high-2 robotoff-low - worker_low_1: + worker_3: <<: *robotoff-worker - container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_low_1 - # Each worker (whether it's a high or low priority worker) listens to a - # single high priority queue + container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_3 command: python -m robotoff run-worker robotoff-high-3 robotoff-low - worker_low_2: + worker_4: <<: *robotoff-worker - container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_low_2 + container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_4 command: python -m robotoff run-worker robotoff-high-4 robotoff-low scheduler: diff --git a/docker/dev.yml b/docker/dev.yml index e90de3fa1e..66536137c4 100644 --- a/docker/dev.yml +++ b/docker/dev.yml @@ -36,13 +36,13 @@ x-robotoff-dev: &robotoff-dev services: api: <<: *robotoff-dev - worker_high_1: + worker_1: <<: *robotoff-dev - worker_high_2: + worker_2: <<: *robotoff-dev - worker_low_1: + worker_3: <<: *robotoff-dev - worker_low_2: + worker_4: <<: *robotoff-dev scheduler: diff --git a/robotoff/cli/main.py b/robotoff/cli/main.py index d4d24c8190..1cf1e14ecd 100644 --- a/robotoff/cli/main.py +++ b/robotoff/cli/main.py @@ -4,7 +4,6 @@ import typer from robotoff.types import ( - NeuralCategoryClassifierModel, ObjectDetectionModel, PredictionType, ProductIdentifier, @@ -132,11 +131,11 @@ def categorize( ServerType.off, help="Server type of the product" ), deepest_only: bool = False, - model_name: NeuralCategoryClassifierModel = typer.Option( - NeuralCategoryClassifierModel.keras_image_embeddings_3_0, - help="name of the model to use", - ), threshold: Optional[float] = typer.Option(0.5, help="detection threshold to use"), + triton_uri: Optional[str] = typer.Option( + None, + help="URI of the Triton server to use. If not provided, the default value from settings is used.", + ), ) -> None: """Predict product categories based on the neural category classifier. @@ -164,7 +163,11 @@ def categorize( predictions, _ = CategoryClassifier( get_taxonomy(TaxonomyType.category.name, offline=True) ).predict( - product, product_id, deepest_only, threshold=threshold, model_name=model_name + product, + product_id, + deepest_only, + threshold=threshold, + triton_uri=triton_uri, ) if predictions: @@ -357,6 +360,70 @@ def import_images_in_db( ) +@app.command() +def run_category_prediction( + triton_uri: Optional[str] = typer.Option( + None, + help="URI of the Triton Inference Server to use. If not provided, the default value from settings is used.", + ), + limit: Optional[int] = typer.Option( + None, help="Maximum numbers of job to launch (default: all)" + ), +): + """Launch category prediction jobs on all products without categories in + DB.""" + import tqdm + from openfoodfacts.dataset import ProductDataset + + from robotoff.models import Prediction, db + from robotoff.settings import DATASET_DIR + from robotoff.utils import get_logger + from robotoff.workers.queues import enqueue_job, low_queue + from robotoff.workers.tasks.product_updated import add_category_insight_job + + logger = get_logger() + # Download the latest dump of the dataset, cache it in DATASET_DIR + ds = ProductDataset(force_download=True, download_newer=True, cache_dir=DATASET_DIR) + + # The category detector only works for food products + server_type = ServerType.off + + logger.info("Fetching products without categories in DB...") + with db: + barcode_with_categories = set( + barcode + for (barcode,) in Prediction.select(Prediction.barcode) + .distinct() + .where( + Prediction.server_type == server_type.name, + Prediction.type == PredictionType.category.name, + ) + .tuples() + .limit(limit) + ) + logger.info( + "%d products with categories already in DB", len(barcode_with_categories) + ) + seen: set[str] = set() + added = 0 + for product in tqdm.tqdm(ds, desc="products"): + barcode = product.get("code") + if not barcode or barcode in seen or barcode in barcode_with_categories: + continue + seen.add(barcode) + # Enqueue a job to predict category for this product + enqueue_job( + add_category_insight_job, + low_queue, + job_kwargs={"result_ttl": 0}, + product_id=ProductIdentifier(barcode, server_type), + triton_uri=triton_uri, + ) + added += 1 + + logger.info("%d jobs added", added) + + @app.command() def run_object_detection_model( server_type: ServerType = typer.Option( @@ -375,6 +442,10 @@ def run_object_detection_model( "for the specified model.", ), limit: Optional[int] = typer.Option(None, help="Maximum numbers of job to launch"), + triton_uri: Optional[str] = typer.Option( + None, + help="URI of the Triton Inference Server to use. If not provided, the default value from settings is used.", + ), ): """Launch object detection model jobs on all missing images (images without an ImagePrediction item for this model) in DB.""" @@ -415,7 +486,7 @@ def run_object_detection_model( else: with db: query = ( - ImageModel.select(ImageModel.barcode, ImageModel.id) + ImageModel.select(ImageModel.barcode, ImageModel.image_id) .join( ImagePrediction, JOIN.LEFT_OUTER, @@ -425,8 +496,7 @@ def run_object_detection_model( ), ) .where( - ImageModel.server_type - == server_type.name + (ImageModel.server_type == server_type.name) & ImagePrediction.model_name.is_null() & (ImageModel.deleted == False), # noqa: E712 ) @@ -451,6 +521,7 @@ def run_object_detection_model( job_kwargs={"result_ttl": 0}, product_id=ProductIdentifier(barcode, server_type), image_url=image_url, + triton_uri=triton_uri, ) diff --git a/robotoff/insights/extraction.py b/robotoff/insights/extraction.py index d975b14e4f..3f0919a030 100644 --- a/robotoff/insights/extraction.py +++ b/robotoff/insights/extraction.py @@ -52,6 +52,7 @@ def run_object_detection_model( image_model: ImageModel, threshold: float = 0.1, return_null_if_exist: bool = True, + triton_uri: str | None = None, ) -> Optional[ImagePrediction]: """Run a model detection model and save the results in the `image_prediction` table. @@ -67,7 +68,10 @@ def run_object_detection_model( `image` table) :param threshold: the minimum object score above which we keep the object data - + :param return_null_if_exist: if True, return None if the image prediction + already exists in DB + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. :return: return None if the image does not exist in DB, or the created `ImagePrediction` otherwise """ @@ -82,7 +86,7 @@ def run_object_detection_model( timestamp = datetime.datetime.utcnow() results = ObjectDetectionModelRegistry.get(model_name.value).detect_from_image( - image, output_image=False + image, output_image=False, triton_uri=triton_uri ) data = results.to_json(threshold=threshold) max_confidence = max((item["score"] for item in data), default=None) diff --git a/robotoff/prediction/category/__init__.py b/robotoff/prediction/category/__init__.py index e03cbba220..705c3b18ff 100644 --- a/robotoff/prediction/category/__init__.py +++ b/robotoff/prediction/category/__init__.py @@ -1,5 +1,3 @@ -from typing import Optional - from robotoff.taxonomy import TaxonomyType, get_taxonomy from robotoff.types import JSONType, NeuralCategoryClassifierModel, ProductIdentifier @@ -10,9 +8,10 @@ def predict_category( product: dict, product_id: ProductIdentifier, deepest_only: bool, - threshold: Optional[float] = None, - neural_model_name: Optional[NeuralCategoryClassifierModel] = None, + threshold: float | None = None, + neural_model_name: NeuralCategoryClassifierModel | None = None, clear_cache: bool = False, + triton_uri: str | None = None, ) -> JSONType: """Predict categories for a product using neural model. @@ -30,6 +29,8 @@ def predict_category( prediction :param clear_cache: if True, clear ingredient processing cache of neural model before returning results + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. """ taxonomy = get_taxonomy(TaxonomyType.category.name) predictions, debug = CategoryClassifier(taxonomy).predict( @@ -39,6 +40,7 @@ def predict_category( threshold, neural_model_name, clear_cache=clear_cache, + triton_uri=triton_uri, ) return { "neural": { diff --git a/robotoff/prediction/category/neural/category_classifier.py b/robotoff/prediction/category/neural/category_classifier.py index e38aa19ed0..85897f2952 100644 --- a/robotoff/prediction/category/neural/category_classifier.py +++ b/robotoff/prediction/category/neural/category_classifier.py @@ -1,4 +1,4 @@ -from typing import Any, Optional +from typing import Any import numpy as np @@ -66,9 +66,10 @@ def predict( product: dict, product_id: ProductIdentifier, deepest_only: bool = False, - threshold: Optional[float] = None, - model_name: Optional[NeuralCategoryClassifierModel] = None, + threshold: float | None = None, + model_name: NeuralCategoryClassifierModel | None = None, clear_cache: bool = False, + triton_uri: str | None = None, ) -> tuple[list[Prediction], JSONType]: """Return an unordered list of category predictions for the given product and additional debug information. @@ -122,6 +123,8 @@ def predict( default. :param clear_cache: if True, clear ingredient processing cache before returning results + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. """ logger.debug("predicting category with model %s", model_name) @@ -142,7 +145,7 @@ def predict( ) # Only generate image embeddings if it's required by the model - triton_stub = get_triton_inference_stub() + triton_stub = get_triton_inference_stub(triton_uri) # We check whether image embeddings were provided as input if "image_embeddings" in product: diff --git a/robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py b/robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py index d51484f5c4..accec91755 100644 --- a/robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py +++ b/robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py @@ -1,3 +1,4 @@ +import time from typing import Literal, Optional import numpy as np @@ -168,9 +169,15 @@ def generate_image_embeddings( ) if non_null_image_by_ids: + start_time = time.monotonic() computed_embeddings_by_id = _generate_image_embeddings( non_null_image_by_ids, stub ) + logger.debug( + "Computed %d embeddings in %.2f seconds", + len(computed_embeddings_by_id), + time.monotonic() - start_time, + ) # Make sure all image IDs are in image table refresh_images_in_db(product_id, product.get("images", {})) # Save embeddings in embeddings.image_embeddings table for @@ -267,7 +274,9 @@ def predict( inputs = generate_inputs_dict(product, ocr_texts, image_embeddings) debug = generate_debug_dict(model_name, threshold, inputs) + start_time = time.monotonic() scores, labels = _predict(inputs, model_name, stub) + logger.debug("Predicted categories in %.2f seconds", time.monotonic() - start_time) indices = np.argsort(-scores) category_predictions: list[tuple[str, float, Optional[NeighborPredictionType]]] = [] diff --git a/robotoff/prediction/ingredient_list/__init__.py b/robotoff/prediction/ingredient_list/__init__.py index ab5c1edcda..f8e9854c3b 100644 --- a/robotoff/prediction/ingredient_list/__init__.py +++ b/robotoff/prediction/ingredient_list/__init__.py @@ -11,7 +11,7 @@ from robotoff import settings from robotoff.prediction.ingredient_list.postprocess import detect_additional_mentions from robotoff.prediction.langid import LanguagePrediction, predict_lang_batch -from robotoff.triton import get_triton_inference_stub +from robotoff.triton import GRPCInferenceServiceStub, get_triton_inference_stub from robotoff.utils import http_session from .transformers_pipeline import AggregationStrategy, TokenClassificationPipeline @@ -75,10 +75,11 @@ class IngredientPredictionOutput: def predict_from_ocr( - input_ocr: Union[str, OCRResult], + input_ocr: str | OCRResult, aggregation_strategy: AggregationStrategy = AggregationStrategy.FIRST, predict_lang: bool = True, model_version: str = "1", + triton_uri: str | None = None, ) -> IngredientPredictionOutput: """Predict ingredient lists from an OCR. @@ -89,6 +90,8 @@ def predict_from_ocr( `IngredientPredictionAggregatedEntity`. This flag is ignored if `aggregation_strategy` is `NONE`. :param model_version: version of the model model to use, defaults to "1" + :param triton_uri: URI of the Triton Inference Server, defaults to None. If + not provided, the default value from settings is used. :return: the `IngredientPredictionOutput` """ ocr_result: OCRResult @@ -102,8 +105,9 @@ def predict_from_ocr( if not text: return IngredientPredictionOutput(entities=[], text=text) # type: ignore + triton_stub = get_triton_inference_stub(triton_uri) predictions = predict_batch( - [text], aggregation_strategy, predict_lang, model_version + [text], triton_stub, aggregation_strategy, predict_lang, model_version ) prediction = predictions[0] @@ -131,6 +135,7 @@ def get_tokenizer(model_dir: Path) -> PreTrainedTokenizerBase: def predict_batch( texts: list[str], + triton_stub: GRPCInferenceServiceStub, aggregation_strategy: AggregationStrategy = AggregationStrategy.FIRST, predict_lang: bool = True, model_version: str = "1", @@ -138,6 +143,7 @@ def predict_batch( """Predict ingredient lists from a batch of texts using the NER model. :param texts: a list of strings + :param triton_stub: the Triton gRPC inference service stub :param aggregation_strategy: the aggregation strategy to use, defaults to AggregationStrategy.FIRST. See the HuggingFace documentation: https://huggingface.co/docs/transformers/main_classes/pipelines#transformers.TokenClassificationPipeline @@ -162,6 +168,7 @@ def predict_batch( batch_encoding.input_ids, batch_encoding.attention_mask, "ingredient-ner", + triton_stub=triton_stub, model_version=model_version, ) pipeline = TokenClassificationPipeline(tokenizer, INGREDIENT_ID2LABEL) @@ -234,6 +241,7 @@ def send_ner_infer_request( input_ids: np.ndarray, attention_mask: np.ndarray, model_name: str, + triton_stub: GRPCInferenceServiceStub, model_version: str = "1", ) -> np.ndarray: """Send a NER infer request to the Triton inference server. @@ -248,9 +256,8 @@ def send_ner_infer_request( :param model_version: version of the model model to use, defaults to "1" :return: the predicted logits """ - stub = get_triton_inference_stub() request = build_triton_request(input_ids, attention_mask, model_name, model_version) - response = stub.ModelInfer(request) + response = triton_stub.ModelInfer(request) num_tokens = response.outputs[0].shape[1] num_labels = response.outputs[0].shape[2] return np.frombuffer( diff --git a/robotoff/prediction/object_detection/core.py b/robotoff/prediction/object_detection/core.py index 53c446cd9c..fe908ae2fa 100644 --- a/robotoff/prediction/object_detection/core.py +++ b/robotoff/prediction/object_detection/core.py @@ -1,5 +1,6 @@ import dataclasses import pathlib +import time from typing import Optional import numpy as np @@ -104,11 +105,23 @@ def __init__(self, name: str, label_names: list[str]): self.label_names = label_names def detect_from_image( - self, image: Image.Image, output_image: bool = False + self, + image: Image.Image, + output_image: bool = False, + triton_uri: str | None = None, ) -> ObjectDetectionRawResult: + """Run object detection model on an image. + + :param image: the input Pillow image + :param output_image: if True, the image with boxes and labels is + returned in the result + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. + :return: the detection result + """ resized_image = resize_image(image, settings.OBJECT_DETECTION_IMAGE_MAX_SIZE) image_array = convert_image_to_array(resized_image) - grpc_stub = get_triton_inference_stub() + grpc_stub = get_triton_inference_stub(triton_uri) request = service_pb2.ModelInferRequest() request.model_name = self.name @@ -129,7 +142,11 @@ def detect_from_image( request.outputs.extend([output]) request.raw_input_contents.extend([image_array.tobytes()]) + start_time = time.monotonic() response = grpc_stub.ModelInfer(request) + logger.debug( + "Inference time for %s: %s", self.name, time.monotonic() - start_time + ) if len(response.outputs) != 4: raise Exception(f"expected 4 output, got {len(response.outputs)}") diff --git a/robotoff/triton.py b/robotoff/triton.py index 3e4a0a7422..e44229968e 100644 --- a/robotoff/triton.py +++ b/robotoff/triton.py @@ -7,6 +7,7 @@ from PIL import Image from transformers import CLIPImageProcessor from tritonclient.grpc import service_pb2, service_pb2_grpc +from tritonclient.grpc.service_pb2_grpc import GRPCInferenceServiceStub from robotoff import settings from robotoff.utils import get_logger @@ -21,8 +22,18 @@ @functools.cache -def get_triton_inference_stub() -> service_pb2_grpc.GRPCInferenceServiceStub: - channel = grpc.insecure_channel(settings.TRITON_URI) +def get_triton_inference_stub( + triton_uri: str | None = None, +) -> GRPCInferenceServiceStub: + """Return a gRPC stub for Triton Inference Server. + + If `triton_uri` is not provided, the default value from settings is used. + + :param triton_uri: URI of the Triton Inference Server, defaults to None + :return: gRPC stub for Triton Inference Server + """ + triton_uri = triton_uri or settings.TRITON_URI + channel = grpc.insecure_channel(triton_uri) return service_pb2_grpc.GRPCInferenceServiceStub(channel) @@ -67,13 +78,13 @@ def generate_clip_embedding_request(images: list[Image.Image]): return request -def generate_clip_embedding(images: list[Image.Image]) -> np.ndarray: +def generate_clip_embedding( + images: list[Image.Image], triton_stub: GRPCInferenceServiceStub +) -> np.ndarray: embedding_batches = [] - stub = get_triton_inference_stub() - for image_batch in chunked(images, CLIP_MAX_BATCH_SIZE): request = generate_clip_embedding_request(image_batch) - response = stub.ModelInfer(request) + response = triton_stub.ModelInfer(request) embedding_batch = np.frombuffer( response.raw_output_contents[0], dtype=np.float32, diff --git a/robotoff/workers/tasks/import_image.py b/robotoff/workers/tasks/import_image.py index b3771c476b..bb1bcb2867 100644 --- a/robotoff/workers/tasks/import_image.py +++ b/robotoff/workers/tasks/import_image.py @@ -41,7 +41,11 @@ from robotoff.products import get_product_store from robotoff.slack import NotifierFactory from robotoff.taxonomy import get_taxonomy -from robotoff.triton import generate_clip_embedding +from robotoff.triton import ( + GRPCInferenceServiceStub, + generate_clip_embedding, + get_triton_inference_stub, +) from robotoff.types import ( JSONType, ObjectDetectionModel, @@ -200,7 +204,9 @@ def import_insights_from_image( logger.info(import_result) -def save_image_job(batch: list[tuple[ProductIdentifier, str]], server_type: ServerType): +def save_image_job( + batch: list[tuple[ProductIdentifier, str]], server_type: ServerType +) -> None: """Save a batch of images in DB. :param batch: a batch of (product_id, source_image) tuples @@ -225,7 +231,16 @@ def save_image_job(batch: list[tuple[ProductIdentifier, str]], server_type: Serv ) -def run_nutrition_table_object_detection(product_id: ProductIdentifier, image_url: str): +def run_nutrition_table_object_detection( + product_id: ProductIdentifier, image_url: str, triton_uri: str | None = None +) -> None: + """Detect the nutrition table in an image and generate a prediction. + + :param product_id: identifier of the product + :param image_url: URL of the image to use + :param triton_uri: URI of the Triton Inference Server, defaults to None. If + not provided, the default value from settings is used. + """ logger.info( "Running nutrition table object detection for %s, image %s", product_id, @@ -247,7 +262,10 @@ def run_nutrition_table_object_detection(product_id: ProductIdentifier, image_ur source_image=source_image, server_type=product_id.server_type.name ): run_object_detection_model( - ObjectDetectionModel.nutrition_table, image, image_model + ObjectDetectionModel.nutrition_table, + image, + image_model, + triton_uri=triton_uri, ) else: logger.info("Missing image in DB for image %s", source_image) @@ -342,7 +360,16 @@ def run_upc_detection(product_id: ProductIdentifier, image_url: str) -> None: logger.info(import_result) -def run_nutriscore_object_detection(product_id: ProductIdentifier, image_url: str): +def run_nutriscore_object_detection( + product_id: ProductIdentifier, image_url: str, triton_uri: str | None = None +) -> None: + """Detect the nutriscore in an image and generate a prediction. + + :param product_id: identifier of the product + :param image_url: URL of the image to use + :param triton_uri: URI of the Triton Inference Server, defaults to None. If + not provided, the default value from settings is used. + """ logger.info( "Running nutriscore object detection for %s, image %s", product_id, image_url ) @@ -367,7 +394,7 @@ def run_nutriscore_object_detection(product_id: ProductIdentifier, image_url: st return image_prediction = run_object_detection_model( - ObjectDetectionModel.nutriscore, image, image_model + ObjectDetectionModel.nutriscore, image, image_model, triton_uri=triton_uri ) if image_prediction is None: @@ -407,14 +434,16 @@ def run_nutriscore_object_detection(product_id: ProductIdentifier, image_url: st def run_logo_object_detection( - product_id: ProductIdentifier, image_url: str, ocr_url: str -): + product_id: ProductIdentifier, image_url: str, ocr_url: str, triton_uri: str | None +) -> None: """Detect logos using the universal logo detector model and generate logo-related predictions. :param product_id: identifier of the product :param image_url: URL of the image to use :param ocr_url: URL of the OCR JSON file, used to extract text of each logo + :param triton_uri: URI of the Triton Inference Server, defaults to None. If + not provided, the default value from settings is used. """ logger.info("Running logo object detection for %s, image %s", product_id, image_url) @@ -443,6 +472,7 @@ def run_logo_object_detection( image, image_model, return_null_if_exist=False, + triton_uri=triton_uri, ) existing_logos = list(image_prediction.logos) @@ -481,8 +511,9 @@ def run_logo_object_detection( ] if logos: + triton_stub = get_triton_inference_stub(triton_uri) with db.connection_context(): - save_logo_embeddings(logos, image) + save_logo_embeddings(logos, image, triton_stub) enqueue_job( process_created_logos, get_high_queue(product_id), @@ -517,7 +548,11 @@ def get_text_from_bounding_box( return None -def save_logo_embeddings(logos: list[LogoAnnotation], image: Image.Image): +def save_logo_embeddings( + logos: list[LogoAnnotation], + image: Image.Image, + triton_stub: GRPCInferenceServiceStub, +): """Generate logo embeddings using CLIP model and save them in logo_embedding table.""" resized_cropped_images = [] @@ -531,7 +566,7 @@ def save_logo_embeddings(logos: list[LogoAnnotation], image: Image.Image): ) cropped_image = image.crop((left, top, right, bottom)) resized_cropped_images.append(cropped_image.resize((224, 224))) - embeddings = generate_clip_embedding(resized_cropped_images) + embeddings = generate_clip_embedding(resized_cropped_images, triton_stub) with db.atomic(): for i in range(len(logos)): @@ -593,13 +628,17 @@ def add_image_fingerprint_job(image_model_id: int): @with_db -def extract_ingredients_job(product_id: ProductIdentifier, ocr_url: str): +def extract_ingredients_job( + product_id: ProductIdentifier, ocr_url: str, triton_uri: str | None = None +): """Extracts ingredients using ingredient extraction model from an image OCR. :param product_id: The identifier of the product to extract ingredients - for. + for. :param ocr_url: The URL of the image to extract ingredients from. + :param triton_uri: URI of the Triton Inference Server, defaults to None. If + not provided, the default value from settings is used. """ source_image = get_source_from_url(ocr_url) @@ -620,7 +659,7 @@ def extract_ingredients_job(product_id: ProductIdentifier, ocr_url: str): ) is not None: return - output = ingredient_list.predict_from_ocr(ocr_url) + output = ingredient_list.predict_from_ocr(ocr_url, triton_uri=triton_uri) entities: list[ ingredient_list.IngredientPredictionAggregatedEntity ] = output.entities # type: ignore @@ -634,9 +673,9 @@ def extract_ingredients_job(product_id: ProductIdentifier, ocr_url: str): model_version=ingredient_list.MODEL_VERSION, data=ingredient_prediction_data, timestamp=datetime.datetime.utcnow(), - max_confidence=max(entity.score for entity in entities) - if entities - else None, + max_confidence=( + max(entity.score for entity in entities) if entities else None + ), ) logger.info("create image prediction (ingredient detection) from %s", ocr_url) diff --git a/robotoff/workers/tasks/product_updated.py b/robotoff/workers/tasks/product_updated.py index 2b85751979..6309c2ac76 100644 --- a/robotoff/workers/tasks/product_updated.py +++ b/robotoff/workers/tasks/product_updated.py @@ -65,11 +65,34 @@ def update_insights_job(product_id: ProductIdentifier, diffs: JSONType) -> None: ) -def add_category_insight(product_id: ProductIdentifier, product: JSONType) -> None: +@with_db +def add_category_insight_job( + product_id: ProductIdentifier, triton_uri: str | None = None +) -> None: + """Job to add category insight for a product. + + :param product_id: identifier of the product + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. + """ + product_dict = get_product(product_id) + + if product_dict is None: + logger.info("Product does not exist: %s", product_id) + return + + add_category_insight(product_id, product_dict, triton_uri=triton_uri) + + +def add_category_insight( + product_id: ProductIdentifier, product: JSONType, triton_uri: str | None = None +) -> None: """Predict categories for product and import predicted category insight. :param product_id: identifier of the product :param product: product as retrieved from MongoDB + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. """ if not product_id.server_type.is_food(): # Category prediction is only available for Food products @@ -82,7 +105,7 @@ def add_category_insight(product_id: ProductIdentifier, product: JSONType) -> No try: neural_predictions, _ = CategoryClassifier( get_taxonomy(TaxonomyType.category.name) - ).predict(product, product_id) + ).predict(product, product_id, triton_uri=triton_uri) product_predictions = neural_predictions except requests.exceptions.HTTPError as e: resp = e.response @@ -103,15 +126,17 @@ def add_category_insight(product_id: ProductIdentifier, product: JSONType) -> No def updated_product_predict_insights( - product_id: ProductIdentifier, product: JSONType + product_id: ProductIdentifier, product: JSONType, triton_uri: str | None = None ) -> None: """Predict and import category insights and insights-derived from product name. :param product_id: identifier of the product :param product: product as retrieved from MongoDB + :param triton_uri: URI of the Triton Inference Server, defaults to + None. If not provided, the default value from settings is used. """ - add_category_insight(product_id, product) + add_category_insight(product_id, product, triton_uri=triton_uri) product_name = product.get("product_name") if not product_name: diff --git a/tests/integration/insights/test_extraction.py b/tests/integration/insights/test_extraction.py index d660172b30..b315469989 100644 --- a/tests/integration/insights/test_extraction.py +++ b/tests/integration/insights/test_extraction.py @@ -26,7 +26,10 @@ def __init__(self, raw_result: ObjectDetectionRawResult): self.raw_result = raw_result def detect_from_image( - self, image: Image.Image, output_image: bool = False + self, + image: Image.Image, + output_image: bool = False, + triton_uri: str | None = None, ) -> ObjectDetectionRawResult: return self.raw_result diff --git a/tests/integration/test_import_image.py b/tests/integration/test_import_image.py index 42505d38d4..5f4a5369dc 100644 --- a/tests/integration/test_import_image.py +++ b/tests/integration/test_import_image.py @@ -36,6 +36,7 @@ def test_save_logo_embeddings(peewee_db, mocker): "robotoff.workers.tasks.import_image.generate_clip_embedding", return_value=expected_embeddings, ) + triton_stub = mocker.MagicMock() image_array = np.random.rand(800, 800, 3) * 255 image = Image.fromarray(image_array.astype("uint8")).convert("RGB") @@ -45,7 +46,7 @@ def test_save_logo_embeddings(peewee_db, mocker): LogoAnnotationFactory(image_prediction=image_prediction, index=i) for i in range(5) ] - save_logo_embeddings(logos, image) + save_logo_embeddings(logos, image, triton_stub) logo_embedding_instances = LogoEmbedding.select().where( LogoEmbedding.logo_id.in_([logo.id for logo in logos]) ) diff --git a/tests/integration/workers/tasks/test_import_image.py b/tests/integration/workers/tasks/test_import_image.py index 45e896254b..a878cea7b9 100644 --- a/tests/integration/workers/tasks/test_import_image.py +++ b/tests/integration/workers/tasks/test_import_image.py @@ -90,7 +90,9 @@ def test_extract_ingredients_job(mocker, peewee_db): extract_ingredients_job( ProductIdentifier(barcode, ServerType.off), ocr_url=ocr_url ) - ingredient_list_mocker.predict_from_ocr.assert_called_once_with(ocr_url) + ingredient_list_mocker.predict_from_ocr.assert_called_once_with( + ocr_url, triton_uri=None + ) parse_ingredients_mocker.assert_called_once_with("water, salt, sugar.", "en") image_prediction = ImagePrediction.get_or_none( ImagePrediction.model_name == "ingredient-detection", diff --git a/tests/unit/elasticsearch/test_export.py b/tests/unit/elasticsearch/test_export.py index fec4fde9fd..d41b5be742 100644 --- a/tests/unit/elasticsearch/test_export.py +++ b/tests/unit/elasticsearch/test_export.py @@ -3,8 +3,8 @@ def test_load_index_already_exists(mocker): - mocker.patch("elasticsearch.client.IndicesClient.exists", return_value=True) - create_call = mocker.patch("elasticsearch.client.IndicesClient.create") + mocker.patch("elasticsearch._sync.client.IndicesClient.exists", return_value=True) + create_call = mocker.patch("elasticsearch._sync.client.IndicesClient.create") exporter = ElasticsearchExporter(get_es_client()) exporter.load_index(ElasticSearchIndex.logo) @@ -12,8 +12,8 @@ def test_load_index_already_exists(mocker): def test_load_index(mocker): - mocker.patch("elasticsearch.client.IndicesClient.exists", return_value=False) - create_call = mocker.patch("elasticsearch.client.IndicesClient.create") + mocker.patch("elasticsearch._sync.client.IndicesClient.exists", return_value=False) + create_call = mocker.patch("elasticsearch._sync.client.IndicesClient.create") exporter = ElasticsearchExporter(get_es_client()) exporter.load_index(ElasticSearchIndex.logo)