diff --git a/robotoff/cli/main.py b/robotoff/cli/main.py index 98d9c51847..a7d2e49f62 100644 --- a/robotoff/cli/main.py +++ b/robotoff/cli/main.py @@ -1021,7 +1021,12 @@ def launch_spellcheck_batch_job( @app.command() -def launch_normalize_barcode_job(): +def launch_normalize_barcode_job( + batch_size: int = 100_000, + launch_prediction: bool = True, + launch_insight: bool = True, + launch_image: bool = True, +) -> None: from openfoodfacts.images import generate_image_path from peewee import fn @@ -1033,110 +1038,121 @@ def launch_normalize_barcode_job(): logger.info("Starting barcode normalization job") with db.connection_context(): - updated = 0 - min_id = 0 - max_id = Prediction.select(fn.MAX(Prediction.id)).scalar() - with db.atomic() as tsx: - while min_id < max_id: - prediction = None - for prediction in ( - Prediction.select() - .where(Prediction.id >= min_id) - .order_by(Prediction.id.asc()) - .limit(10_000) - ): - barcode = normalize_barcode(prediction.barcode) - source_image = ( - generate_image_path( - prediction.barcode, Path(prediction.source_image).stem + if launch_prediction: + updated = 0 + min_id = 0 + max_id = Prediction.select(fn.MAX(Prediction.id)).scalar() + with db.atomic() as tsx: + while min_id < max_id: + prediction = None + for prediction in ( + Prediction.select() + .where(Prediction.id >= min_id) + .order_by(Prediction.id.asc()) + .limit(batch_size) + ): + barcode = normalize_barcode(prediction.barcode) + source_image = ( + generate_image_path( + prediction.barcode, Path(prediction.source_image).stem + ) + if prediction.source_image + else None ) - if prediction.source_image - else None - ) - is_updated = (barcode != prediction.barcode) or ( - source_image != prediction.source_image - ) - if is_updated: - prediction.barcode = barcode - prediction.source_image = source_image - prediction.save() - updated += 1 - - tsx.commit() - logger.info("Current ID: %s, Updated %d predictions", min_id, updated) - if prediction is not None: - min_id = prediction.id - else: - break - - logger.info("Updated %d predictions", updated) - - updated = 0 - min_id = ProductInsight.select(fn.MIN(ProductInsight.timestamp)).scalar() - max_id = ProductInsight.select(fn.MAX(ProductInsight.timestamp)).scalar() - with db.atomic() as tsx: - while min_id < max_id: - insight = None - for insight in ( - ProductInsight.select() - .where(ProductInsight.timestamp >= min_id) - .order_by(ProductInsight.timestamp.asc()) - .limit(10_000) - ): - barcode = normalize_barcode(insight.barcode) - source_image = generate_image_path( - insight.barcode, Path(insight.source_image).stem - ) - is_updated = (barcode != insight.barcode) or ( - source_image != insight.source_image - ) - if is_updated: - insight.barcode = barcode - insight.source_image = source_image - insight.save() - updated += 1 - - tsx.commit() - logger.info("Current ID: %s, Updated %d insights", min_id, updated) - if insight is not None: - min_id = insight.timestamp - else: - break - - logger.info("Updated %d insights", updated) - - updated = 0 - min_id = ImageModel.select(fn.MIN(ImageModel.id)).scalar() - max_id = ImageModel.select(fn.MAX(ImageModel.id)).scalar() - with db.atomic() as tsx: - while min_id < max_id: - image = None - for image in ( - ImageModel.select() - .where(ImageModel.id >= min_id) - .order_by(ImageModel.id.asc()) - .limit(10_000) - ): - barcode = normalize_barcode(image.barcode) - source_image = generate_image_path( - image.barcode, Path(image.source_image).stem - ) - is_updated = (barcode != image.barcode) or ( - source_image != image.source_image - ) - if is_updated: - image.barcode = barcode - image.source_image = source_image - image.save() - updated += 1 - - tsx.commit() - logger.info("Current ID: %s, Updated %d images", min_id, updated) - if image is not None: - min_id = image.id - else: - break - logger.info("Updated %d images", updated) + is_updated = (barcode != prediction.barcode) or ( + source_image != prediction.source_image + ) + if is_updated: + prediction.barcode = barcode + prediction.source_image = source_image + prediction.save() + updated += 1 + + tsx.commit() + logger.info( + "Current ID: %s, Updated %d predictions", min_id, updated + ) + if prediction is not None: + min_id = prediction.id + else: + break + logger.info("Updated %d predictions", updated) + + if launch_insight: + updated = 0 + min_id = ProductInsight.select(fn.MIN(ProductInsight.timestamp)).scalar() + max_id = ProductInsight.select(fn.MAX(ProductInsight.timestamp)).scalar() + with db.atomic() as tsx: + while min_id < max_id: + insight = None + for insight in ( + ProductInsight.select() + .where(ProductInsight.timestamp >= min_id) + .order_by(ProductInsight.timestamp.asc()) + .limit(batch_size) + ): + barcode = normalize_barcode(insight.barcode) + source_image = ( + generate_image_path( + insight.barcode, Path(insight.source_image).stem + ) + if insight.source_image + else None + ) + is_updated = (barcode != insight.barcode) or ( + source_image != insight.source_image + ) + if is_updated: + insight.barcode = barcode + insight.source_image = source_image + insight.save() + updated += 1 + + tsx.commit() + logger.info("Current ID: %s, Updated %d insights", min_id, updated) + if insight is not None: + min_id = insight.timestamp + else: + break + logger.info("Updated %d insights", updated) + + if launch_image: + updated = 0 + min_id = ImageModel.select(fn.MIN(ImageModel.id)).scalar() + max_id = ImageModel.select(fn.MAX(ImageModel.id)).scalar() + with db.atomic() as tsx: + while min_id < max_id: + image = None + for image in ( + ImageModel.select() + .where(ImageModel.id >= min_id) + .order_by(ImageModel.id.asc()) + .limit(batch_size) + ): + barcode = normalize_barcode(image.barcode) + source_image = ( + generate_image_path( + image.barcode, Path(image.source_image).stem + ) + if image.source_image + else None + ) + is_updated = (barcode != image.barcode) or ( + source_image != image.source_image + ) + if is_updated: + image.barcode = barcode + image.source_image = source_image + image.save() + updated += 1 + + tsx.commit() + logger.info("Current ID: %s, Updated %d images", min_id, updated) + if image is not None: + min_id = image.id + else: + break + logger.info("Updated %d images", updated) def main() -> None: