From db3a02ec4c8e0de5bc536f1d85a32dacd970d7db Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Thu, 19 Sep 2024 11:38:22 -0700 Subject: [PATCH 1/3] chore(similarity): Add try catch around insert grouping record --- src/seer/grouping/grouping.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/seer/grouping/grouping.py b/src/seer/grouping/grouping.py index 472262e3..65627d97 100644 --- a/src/seer/grouping/grouping.py +++ b/src/seer/grouping/grouping.py @@ -11,6 +11,7 @@ from pydantic import BaseModel, ValidationInfo, field_validator from sentence_transformers import SentenceTransformer from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.exc import IntegrityError from torch.cuda import OutOfMemoryError from seer.db import DbGroupingRecord, Session @@ -477,7 +478,6 @@ def insert_new_grouping_record( ) -> None: """ Inserts a new GroupingRecord into the database if the group_hash does not already exist. - If new grouping record was created, return the id. :param session: The database session. :param issue: The issue to insert as a new GroupingRecord. @@ -489,6 +489,12 @@ def insert_new_grouping_record( .first() ) + extra = { + "project_id": issue.project_id, + "stacktrace_length": len(issue.stacktrace), + "input_hash": issue.hash, + } + if existing_record is None: new_record = GroupingRecord( project_id=issue.project_id, @@ -498,15 +504,26 @@ def insert_new_grouping_record( error_type=issue.exception_type, ).to_db_model() session.add(new_record) + + try: + session.flush() + except IntegrityError: + session.expunge(new_record) + existing_record = ( + session.query(DbGroupingRecord) + .filter_by(hash=issue.hash, project_id=issue.project_id) + .first() + ) + extra["existing_hash"] = existing_record.hash + logger.info( + "group_already_exists_in_seer_db", + extra=extra, + ) else: + extra["existing_hash"] = existing_record.hash logger.info( "group_already_exists_in_seer_db", - extra={ - "existing_hash": existing_record.hash, - "project_id": issue.project_id, - "stacktrace_length": len(issue.stacktrace), - "input_hash": issue.hash, - }, + extra=extra, ) @sentry_sdk.tracing.trace From 331fc5d2181ec2afa2d63c4dbb8911ce8d3c5533 Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Thu, 19 Sep 2024 14:57:52 -0700 Subject: [PATCH 2/3] ref: Create new sessiona and use commit and rollback --- src/seer/grouping/grouping.py | 80 ++++++++++++++-------------- tests/seer/grouping/test_grouping.py | 17 +++--- 2 files changed, 45 insertions(+), 52 deletions(-) diff --git a/src/seer/grouping/grouping.py b/src/seer/grouping/grouping.py index 65627d97..aeb5fa57 100644 --- a/src/seer/grouping/grouping.py +++ b/src/seer/grouping/grouping.py @@ -363,8 +363,7 @@ def get_nearest_neighbors(self, issue: GroupingRequest) -> SimilarityResponse: "stacktrace_length": len(issue.stacktrace), }, ) - self.insert_new_grouping_record(session, issue, embedding) - session.commit() + self.insert_new_grouping_record(issue, embedding) similarity_response = SimilarityResponse(responses=[]) for record, distance in results: @@ -473,9 +472,7 @@ def insert_batch_grouping_records( return groups_with_neighbor @sentry_sdk.tracing.trace - def insert_new_grouping_record( - self, session, issue: GroupingRequest, embedding: np.ndarray - ) -> None: + def insert_new_grouping_record(self, issue: GroupingRequest, embedding: np.ndarray) -> None: """ Inserts a new GroupingRecord into the database if the group_hash does not already exist. @@ -483,48 +480,49 @@ def insert_new_grouping_record( :param issue: The issue to insert as a new GroupingRecord. :param embedding: The embedding of the stacktrace. """ - existing_record = ( - session.query(DbGroupingRecord) - .filter_by(hash=issue.hash, project_id=issue.project_id) - .first() - ) - - extra = { - "project_id": issue.project_id, - "stacktrace_length": len(issue.stacktrace), - "input_hash": issue.hash, - } + with Session() as session: + existing_record = ( + session.query(DbGroupingRecord) + .filter_by(hash=issue.hash, project_id=issue.project_id) + .first() + ) - if existing_record is None: - new_record = GroupingRecord( - project_id=issue.project_id, - message=issue.message, - stacktrace_embedding=embedding, - hash=issue.hash, - error_type=issue.exception_type, - ).to_db_model() - session.add(new_record) - - try: - session.flush() - except IntegrityError: - session.expunge(new_record) - existing_record = ( - session.query(DbGroupingRecord) - .filter_by(hash=issue.hash, project_id=issue.project_id) - .first() - ) + extra = { + "project_id": issue.project_id, + "stacktrace_length": len(issue.stacktrace), + "input_hash": issue.hash, + } + + if existing_record is None: + new_record = GroupingRecord( + project_id=issue.project_id, + message=issue.message, + stacktrace_embedding=embedding, + hash=issue.hash, + error_type=issue.exception_type, + ).to_db_model() + session.add(new_record) + + try: + session.commit() + except IntegrityError: + session.rollback() + existing_record = ( + session.query(DbGroupingRecord) + .filter_by(hash=issue.hash, project_id=issue.project_id) + .first() + ) + extra["existing_hash"] = existing_record.hash + logger.info( + "group_already_exists_in_seer_db", + extra=extra, + ) + else: extra["existing_hash"] = existing_record.hash logger.info( "group_already_exists_in_seer_db", extra=extra, ) - else: - extra["existing_hash"] = existing_record.hash - logger.info( - "group_already_exists_in_seer_db", - extra=extra, - ) @sentry_sdk.tracing.trace def delete_grouping_records_for_project(self, project_id: int) -> bool: diff --git a/tests/seer/grouping/test_grouping.py b/tests/seer/grouping/test_grouping.py index d9fd2e3f..22d6f5f8 100644 --- a/tests/seer/grouping/test_grouping.py +++ b/tests/seer/grouping/test_grouping.py @@ -34,7 +34,7 @@ def test_get_nearest_neighbors_has_neighbor(self): message="message", hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) + grouping_lookup().insert_new_grouping_record(grouping_request, embedding) session.commit() grouping_request = GroupingRequest( @@ -122,11 +122,9 @@ def test_insert_new_grouping_record_group_record_exists(self): hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) # Insert the grouping record - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) - session.commit() + grouping_lookup().insert_new_grouping_record(grouping_request, embedding) # Re-insert the grouping record - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) - session.commit() + grouping_lookup().insert_new_grouping_record(grouping_request, embedding) matching_record = ( session.query(DbGroupingRecord) .filter_by(hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD") @@ -150,10 +148,8 @@ def test_insert_new_grouping_record_group_record_cross_project(self): hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) # Insert the grouping record - grouping_lookup().insert_new_grouping_record(session, grouping_request1, embedding) - session.commit() - grouping_lookup().insert_new_grouping_record(session, grouping_request2, embedding) - session.commit() + grouping_lookup().insert_new_grouping_record(grouping_request1, embedding) + grouping_lookup().insert_new_grouping_record(grouping_request2, embedding) matching_record = ( session.query(DbGroupingRecord) .filter_by(hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD") @@ -293,8 +289,7 @@ def test_bulk_create_and_insert_grouping_records_has_neighbor_in_existing_record message="message", hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) - session.commit() + grouping_lookup().insert_new_grouping_record(grouping_request, embedding) # Create record data to attempt to be inserted, create 5 with the stacktrace "stacktrace" hashes = [str(i) * 32 for i in range(10)] From 66d436be698bc16570f016f9eb3db1907aab7535 Mon Sep 17 00:00:00 2001 From: Jodi Jang Date: Fri, 20 Sep 2024 10:59:55 -0700 Subject: [PATCH 3/3] ref: Remove log and use on_conflict_do_nothing --- src/seer/grouping/grouping.py | 51 ++++++++--------------------------- 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/src/seer/grouping/grouping.py b/src/seer/grouping/grouping.py index aeb5fa57..22591b59 100644 --- a/src/seer/grouping/grouping.py +++ b/src/seer/grouping/grouping.py @@ -11,7 +11,6 @@ from pydantic import BaseModel, ValidationInfo, field_validator from sentence_transformers import SentenceTransformer from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.exc import IntegrityError from torch.cuda import OutOfMemoryError from seer.db import DbGroupingRecord, Session @@ -481,48 +480,20 @@ def insert_new_grouping_record(self, issue: GroupingRequest, embedding: np.ndarr :param embedding: The embedding of the stacktrace. """ with Session() as session: - existing_record = ( - session.query(DbGroupingRecord) - .filter_by(hash=issue.hash, project_id=issue.project_id) - .first() + insert_stmt = insert(DbGroupingRecord).values( + project_id=issue.project_id, + message=issue.message, + stacktrace_embedding=embedding, + hash=issue.hash, + error_type=issue.exception_type, ) - extra = { - "project_id": issue.project_id, - "stacktrace_length": len(issue.stacktrace), - "input_hash": issue.hash, - } - - if existing_record is None: - new_record = GroupingRecord( - project_id=issue.project_id, - message=issue.message, - stacktrace_embedding=embedding, - hash=issue.hash, - error_type=issue.exception_type, - ).to_db_model() - session.add(new_record) - - try: - session.commit() - except IntegrityError: - session.rollback() - existing_record = ( - session.query(DbGroupingRecord) - .filter_by(hash=issue.hash, project_id=issue.project_id) - .first() - ) - extra["existing_hash"] = existing_record.hash - logger.info( - "group_already_exists_in_seer_db", - extra=extra, - ) - else: - extra["existing_hash"] = existing_record.hash - logger.info( - "group_already_exists_in_seer_db", - extra=extra, + session.execute( + insert_stmt.on_conflict_do_nothing( + index_elements=(DbGroupingRecord.project_id, DbGroupingRecord.hash) ) + ) + session.commit() @sentry_sdk.tracing.trace def delete_grouping_records_for_project(self, project_id: int) -> bool: