From 486472ededd189bd271ef2539b50e7315ab60b4e Mon Sep 17 00:00:00 2001 From: Scott Blaha Date: Tue, 17 Dec 2024 13:01:24 -0800 Subject: [PATCH] Support multiple runs Summary: Now that we have better type checking we can safely change the pipeline to support multiple runs. In a later diff, we will create a run for each diff in a batch. Reviewed By: alexblanck Differential Revision: D66771270 fbshipit-source-id: 76994db41ab72eb3514798effd9942b889b78455 --- sapp/models.py | 4 ++-- sapp/pipeline/__init__.py | 4 ++-- sapp/pipeline/database_saver.py | 28 +++++++++++++++------------- sapp/pipeline/model_generator.py | 13 +++++++------ 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/sapp/models.py b/sapp/models.py index a3b2b09..26a7ca1 100644 --- a/sapp/models.py +++ b/sapp/models.py @@ -983,8 +983,8 @@ class Run(Base): nullable=True, ) - def get_summary(self, **kwargs) -> RunSummary: - session = Session.object_session(self) + def get_summary(self, session: Optional[Session] = None, **kwargs) -> RunSummary: + session = session or Session.object_session(self) return RunSummary( commit_hash=self.commit_hash, diff --git a/sapp/pipeline/__init__.py b/sapp/pipeline/__init__.py index 49dfc0d..d1f371c 100644 --- a/sapp/pipeline/__init__.py +++ b/sapp/pipeline/__init__.py @@ -308,8 +308,8 @@ class Summary(TypedDict, total=False): project: Optional[str] repo_dir: str repository: Optional[str] - run: Run - run_attributes: object # List[RunAttribute] + runs: List[Run] + runs_attributes: List[object] # List[List[RunAttribute]] run_kind: Optional[str] store_unused_models: bool trace_entries: Dict[TraceKind, Dict[DictKey, List[ParseConditionTuple]]] diff --git a/sapp/pipeline/database_saver.py b/sapp/pipeline/database_saver.py index c121921..a1bfe3c 100644 --- a/sapp/pipeline/database_saver.py +++ b/sapp/pipeline/database_saver.py @@ -16,7 +16,7 @@ from ..bulk_saver import BulkSaver from ..db import DB -from ..db_support import DBID +from ..db_support import DBID, dbid_resolution_context from ..decorators import log_time from ..models import ( ClassTypeInterval, @@ -69,12 +69,16 @@ def __init__( @log_time # pyre-ignore[56]: Pyre can't support this yet. def run( self, input: List[TraceGraph], summary: Summary - ) -> Tuple[RunSummary, Summary]: + ) -> Tuple[List[RunSummary], Summary]: self.graphs = input self.summary = summary self._prep_save() - return self._save(), self.summary + run_summaries = [] + for run in self.summary["runs"]: + with dbid_resolution_context(): + run_summaries.append(self._save(run)) + return run_summaries, self.summary def _prep_save(self) -> None: """Prepares the bulk saver to load the trace graph info into the @@ -92,9 +96,8 @@ def _prep_save(self) -> None: len(self.summary["missing_traces"][trace_kind]), ) - def _save(self) -> RunSummary: + def _save(self, run: Run) -> RunSummary: """Saves bulk saver's info into the databases in bulk.""" - assert self.summary["run"] is not None, "Must have called process before" trace_frames = self.bulk_saver.get_items_to_add(TraceFrame) log.info( @@ -123,20 +126,20 @@ def _save(self) -> RunSummary: if not self.dry_run: with self.database.make_session() as session: pk_gen = self.primary_key_generator.reserve(session, [Run]) - self.summary["run"].id.resolve(id=pk_gen.get(Run), is_new=True) - session.add(self.summary["run"]) + run.id.resolve(id=pk_gen.get(Run), is_new=True) + session.add(run) meta_run_identifier = self.summary.get("meta_run_identifier") if meta_run_identifier is not None: session.add( MetaRunToRunAssoc( meta_run_id=cast(DBID, meta_run_identifier), - run_id=self.summary["run"].id, + run_id=run.id, run_label=self.summary.get("meta_run_child_label", None), ) ) session.commit() - run_id = self.summary["run"].id.resolved() + run_id = run.id.resolved() log.info("Created run: %d", run_id) # Reserves IDs and removes items that have already been saved @@ -148,7 +151,7 @@ def _save(self) -> RunSummary: # Additionally, this allow us to sync information from existing # central issues into yet-to-be created local issues here. self._save_central_issues_and_sync_local_issues( - cast(TRun, self.summary["run"]), self.bulk_saver.get_items_to_add(Issue) + cast(TRun, run), self.bulk_saver.get_items_to_add(Issue) ) self.bulk_saver.save_all(self.database) @@ -164,7 +167,7 @@ def _save(self) -> RunSummary: session.commit() run_summary = run.get_summary() else: - run_summary = self._get_dry_run_summary() + run_summary = self._get_dry_run_summary(run) # pyre-fixme[16]: `RunSummary` has no attribute `num_invisible_issues`. run_summary.num_invisible_issues = 0 @@ -211,8 +214,7 @@ def message_id(graph: TraceGraph, id: DBID) -> int: json.dump(instance, f) f.write("\n") - def _get_dry_run_summary(self) -> RunSummary: - run = self.summary["run"] + def _get_dry_run_summary(self, run: Run) -> RunSummary: return RunSummary( commit_hash=run.commit_hash, differential_id=run.differential_id, diff --git a/sapp/pipeline/model_generator.py b/sapp/pipeline/model_generator.py index d1192ab..897e07f 100644 --- a/sapp/pipeline/model_generator.py +++ b/sapp/pipeline/model_generator.py @@ -115,8 +115,9 @@ def run(self, input: DictEntries, summary: Summary) -> Tuple[TraceGraph, Summary self.summary["big_tito"] = set() # Set[Tuple[str, str, int]] self.graph = TraceGraph() - self.summary["run"] = self._create_empty_run(status=RunStatus.INCOMPLETE) - self.summary["run"].id = DBID() + self.summary["runs"] = [self._create_empty_run(status=RunStatus.INCOMPLETE)] + for run in self.summary["runs"]: + run.id = DBID() self.summary["trace_entries"][TraceKind.precondition] = input["preconditions"] self.summary["trace_entries"][TraceKind.postcondition] = input["postconditions"] @@ -124,15 +125,15 @@ def run(self, input: DictEntries, summary: Summary) -> Tuple[TraceGraph, Summary log.info("Generating issues and traces") for entry in input["issues"]: - self._generate_issue(self.summary["run"], entry, callables) + for run in self.summary["runs"]: + self._generate_issue(run, entry, callables) if self.summary.get("store_unused_models"): for trace_kind, traces in self.summary["trace_entries"].items(): for entries in traces.values(): for entry in entries: - self._generate_trace_frame( - trace_kind, self.summary["run"], entry - ) + for run in self.summary["runs"]: + self._generate_trace_frame(trace_kind, run, entry) return self.graph, self.summary