From 80d7f88e7553194eb58204119a3e2a9ec61a113f Mon Sep 17 00:00:00 2001 From: John McBride Date: Fri, 1 Sep 2023 09:25:26 -0600 Subject: [PATCH] feat: Pizza oven database optimizations (#39) This optimizes the database calls within the pizza oven. Instead of iterating and inserting each author and commit, one after another, this uses methods to bulk insert both commit authors and commits. Signed-off-by: John McBride --- go.mod | 1 + go.sum | 2 + pkg/database/handler.go | 129 ++++++++++++++++++++++++++++++++++----- pkg/server/server.go | 132 +++++++++++++++++++++++++++++++--------- 4 files changed, 220 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 2ac5293..f7a7d99 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/go-git/go-git/v5 v5.6.1 + github.com/google/uuid v1.3.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.9 go.uber.org/zap v1.24.0 diff --git a/go.sum b/go.sum index c6820d7..cdb5eef 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/go-git/go-git/v5 v5.6.1 h1:q4ZRqQl4pR/ZJHc1L5CFjGA1a10u76aV1iC+nh+bHs github.com/go-git/go-git/v5 v5.6.1/go.mod h1:mvyoL6Unz0PiTQrGQfSfiLFhBH1c1e84ylC2MDs4ee8= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= diff --git a/pkg/database/handler.go b/pkg/database/handler.go index 75d7b91..86f56e4 100644 --- a/pkg/database/handler.go +++ b/pkg/database/handler.go @@ -10,7 +10,7 @@ import ( "time" // the injected postgres interface implementations for Go SQL - _ "github.com/lib/pq" + "github.com/lib/pq" "github.com/open-sauced/pizza/oven/pkg/insights" ) @@ -65,23 +65,124 @@ func (p PizzaOvenDbHandler) GetAuthorID(insight insights.CommitInsight) (int, er return id, err } -// InsertAuthor inserts an author by their email -func (p PizzaOvenDbHandler) InsertAuthor(insight insights.CommitInsight) (int, error) { - var id int - err := p.db.QueryRow("INSERT INTO public.commit_authors(commit_author_email) VALUES($1) RETURNING id", insight.AuthorEmail).Scan(&id) - return id, err +// GetAuthorIDs queries the id of an author by their email +func (p PizzaOvenDbHandler) GetAuthorIDs(emails []string) (map[string]int, error) { + emailIDMap := make(map[string]int) + + rows, err := p.db.Query("SELECT id, commit_author_email FROM commit_authors WHERE commit_author_email = ANY($1);", pq.Array(emails)) + if err != nil { + log.Fatal(err) + } + defer rows.Close() + for rows.Next() { + var id int + var email string + if err := rows.Scan(&id, &email); err != nil { + log.Fatal(err) + } + emailIDMap[email] = id + } + + return emailIDMap, nil } -// GetCommitID queries the id of a given commit based on its hash -func (p PizzaOvenDbHandler) GetCommitID(repoID int, insight insights.CommitInsight) (int, error) { - var id int - err := p.db.QueryRow("SELECT id FROM public.commits WHERE baked_repo_id=$1 AND commit_hash=$2", repoID, insight.Hash).Scan(&id) - return id, err +// PrepareBulkAuthorInsert creates a temporary table that mirrors the commit_authors +// and is used to perform a bulk insert "pivot" which accounts for conflicts +func (p PizzaOvenDbHandler) PrepareBulkAuthorInsert(tmpTableName string) (*sql.Tx, *sql.Stmt, error) { + _, err := p.db.Exec(fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM commit_authors WHERE 1=0", tmpTableName)) + if err != nil { + return nil, nil, err + } + + txn, err := p.db.Begin() + if err != nil { + return nil, nil, err + } + + stmt, err := txn.Prepare(pq.CopyIn(tmpTableName, "commit_author_email")) + if err != nil { + newErr := txn.Rollback() + if newErr != nil { + return nil, nil, fmt.Errorf("could not abort the sql transaction: %s - original error: %s", newErr, err) + } + + return nil, nil, err + } + + return txn, stmt, nil +} + +// PivotTmpTableToAuthorsTable performs the pivot from the temporary commit authors +// table to the real one handling any conflicts +func (p PizzaOvenDbHandler) PivotTmpTableToAuthorsTable(tmpTableName string) error { + _, err := p.db.Exec(fmt.Sprintf(` + INSERT INTO public.commit_authors(commit_author_email) + SELECT commit_author_email FROM %s + ON CONFLICT (commit_author_email) + DO NOTHING + `, tmpTableName)) + if err != nil { + return err + } + + _, err = p.db.Exec(fmt.Sprintf("DROP TABLE %s", tmpTableName)) + if err != nil { + return err + } + + return nil +} + +// InsertAuthor inserts an author by their email into the sql transaction +func (p PizzaOvenDbHandler) InsertAuthor(stmt *sql.Stmt, insight insights.CommitInsight) error { + _, err := stmt.Exec(insight.AuthorEmail) + return err +} + +// PrepareBulkCommitInsert gets a sql bulk transaction ready to insert all commits +// from processing in one round trip +func (p PizzaOvenDbHandler) PrepareBulkCommitInsert() (*sql.Tx, *sql.Stmt, error) { + txn, err := p.db.Begin() + if err != nil { + return nil, nil, err + } + + stmt, err := txn.Prepare(pq.CopyIn("commits", "commit_hash", "commit_author_id", "baked_repo_id", "commit_date")) + if err != nil { + newErr := txn.Rollback() + if newErr != nil { + return nil, nil, fmt.Errorf("could not abort commits bulk sql transaction: %s - original error: %s", newErr, err) + } + + return nil, nil, err + } + + return txn, stmt, nil +} + +// ResolveTransaction resolves a given transaction and sql statement +func (p PizzaOvenDbHandler) ResolveTransaction(txn *sql.Tx, stmt *sql.Stmt) error { + _, err := stmt.Exec() + if err != nil { + return err + } + + err = stmt.Close() + if err != nil { + return err + } + + err = txn.Commit() + if err != nil { + return err + } + + return nil } -// InsertCommit inserts a commit based on its commit hash -func (p PizzaOvenDbHandler) InsertCommit(insight insights.CommitInsight, authorID int, repoID int) error { - _, err := p.db.Exec("INSERT INTO public.commits(commit_hash, commit_author_id, baked_repo_id, commit_date) VALUES($1, $2, $3, $4)", insight.Hash, authorID, repoID, insight.Date) +// InsertCommit adds a commit to the given sql.Stmt to be executed in bulk +func (p PizzaOvenDbHandler) InsertCommit(stmt *sql.Stmt, insight insights.CommitInsight, authorID int, repoID int) error { + _, err := stmt.Exec(insight.Hash, authorID, repoID, insight.Date) return err } diff --git a/pkg/server/server.go b/pkg/server/server.go index d5ff703..551ec52 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -8,11 +8,14 @@ import ( "fmt" "log" "net/http" + "strings" + "sync/atomic" "time" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing/object" "github.com/go-git/go-git/v5/plumbing/transport" + "github.com/google/uuid" "go.uber.org/zap" "github.com/open-sauced/pizza/oven/pkg/common" @@ -21,6 +24,10 @@ import ( "github.com/open-sauced/pizza/oven/pkg/providers" ) +// counter is a atomic counter that is used to create canonical, short lived +// temporary table names for bulk inserts of commit authors +var counter int64 + // Config provides the configuration set on server startup // - Never Evict Repos: Repos that are preserved in cache regardless of LRU policy type Config struct { @@ -178,6 +185,10 @@ func (p PizzaOvenServer) processRepository(repoURL string) error { return err } + // Add 1 nanosecond since "git log --since" is inclusive of date/times. + // Although date/times are not unique to commits, it is incredibly unlikely that + // two commits will have the exact same timestamp and be excluded using this method + latestCommitDate = latestCommitDate.Add(time.Nanosecond) p.Logger.Debugf("Querying commits since: %s", latestCommitDate.String()) // Git shortlog options to display summary and email starting at HEAD @@ -187,54 +198,115 @@ func (p PizzaOvenServer) processRepository(repoURL string) error { } p.Logger.Debugf("Getting commit iterator with git log options: %v", gitLogOptions) - commitIter, err := gitRepo.Log(&gitLogOptions) + authorIter, err := gitRepo.Log(&gitLogOptions) if err != nil { return err } - p.Logger.Debugf("Iterating commits in repository: %s", insight.RepoURLSource) - err = commitIter.ForEach(func(c *object.Commit) error { + // Build a unique, atomically safe temporary table name to pivot commit + // author data from + rawUUID := uuid.New().String() + uuid := strings.ReplaceAll(rawUUID, "-", "") + tmpTableName := fmt.Sprintf("temp_table_%s_%d", uuid, atomic.AddInt64(&counter, 1)) + + p.Logger.Debugf("Using temporary db table for commit authors: %s", tmpTableName) + authorTxn, authorStmt, err := p.PizzaOven.PrepareBulkAuthorInsert(tmpTableName) + if err != nil { + return err + } + + // To reduce unnecessary duplicate statement executions, track the unique + // author emails using a simple set (represented as a string map to structs) + uniqueAuthorEmails := []string{} + authorEmailSet := make(map[string]struct{}) + + p.Logger.Debugf("Iterating commit authors in repository: %s with temporary tablename: %s", insight.RepoURLSource, tmpTableName) + err = authorIter.ForEach(func(c *object.Commit) error { // TODO - if the committer and author are not the same, handle both // those users. This is the case where there is a separate committer for // a patch that was not authored by that specific person making the commit. // TODO - if there is a co-author, should handle adding that person on // the commit as well. - insight.AuthorEmail = c.Author.Email - insight.Hash = c.Hash.String() - insight.Date = c.Committer.When.UTC() - p.Logger.Debugf("Inspecting commit: %s %s %s", insight.AuthorEmail, insight.Hash, insight.Date) - authorID, err := p.PizzaOven.GetAuthorID(insight) - if err != nil { - if err == sql.ErrNoRows { - p.Logger.Debugf("Author not found. Inserting: %s %s %s", insight.AuthorEmail, insight.Hash, insight.Date) - authorID, err = p.PizzaOven.InsertAuthor(insight) - if err != nil { - return err - } - } else { - return err - } + // Check if the author email is in the unique set of author emails + if _, ok := authorEmailSet[c.Author.Email]; ok { + return nil } - p.Logger.Debugf("Checking if commit already in database: %s", insight.Hash) - _, err = p.PizzaOven.GetCommitID(repoID, insight) + // Commit author is not in set so add this author's email as unique + authorEmailSet[c.Author.Email] = struct{}{} + uniqueAuthorEmails = append(uniqueAuthorEmails, c.Author.Email) + + p.Logger.Debugf("Inspecting commit author: %s", c.Author.Email) + return p.PizzaOven.InsertAuthor(authorStmt, insights.CommitInsight{ + RepoURLSource: repoURL, + AuthorEmail: c.Author.Email, + Hash: "", + Date: time.Time{}, + }) + }) + if err != nil { + return err + } + + // Resolve, execute, and pivot the bulk author transaction + err = p.PizzaOven.ResolveTransaction(authorTxn, authorStmt) + if err != nil { + return err + } + + err = p.PizzaOven.PivotTmpTableToAuthorsTable(tmpTableName) + if err != nil { + return err + } + + // Re-query the database for author email ids based on the unique list of + // author emails that have just been committed + authorEmailIDMap, err := p.PizzaOven.GetAuthorIDs(uniqueAuthorEmails) + if err != nil { + return err + } + + // Rebuild the iterator from the start using the same options + commitIter, err := gitRepo.Log(&gitLogOptions) + if err != nil { + return err + } + + // Get ready for the commit bulk action + commitTxn, commitStmt, err := p.PizzaOven.PrepareBulkCommitInsert() + if err != nil { + return err + } + + p.Logger.Debugf("Iterating commits in repository: %s", insight.RepoURLSource) + err = commitIter.ForEach(func(c *object.Commit) error { + i := insights.CommitInsight{ + RepoURLSource: repoURL, + AuthorEmail: c.Author.Email, + Hash: c.Hash.String(), + Date: c.Committer.When.UTC(), + } + + p.Logger.Debugf("Inspecting commit: %s %s %s", i.AuthorEmail, i.Hash, i.Date) + err = p.PizzaOven.InsertCommit(commitStmt, i, authorEmailIDMap[i.AuthorEmail], repoID) if err != nil { - if err == sql.ErrNoRows { - p.Logger.Debugf("Commit not found. Inserting into database: %s", insight.Hash) - err = p.PizzaOven.InsertCommit(insight, authorID, repoID) - if err != nil { - return err - } - } else { - return err - } + return err } return nil }) + if err != nil { + return err + } + + // Execute and resolve the bulk commit insert + err = p.PizzaOven.ResolveTransaction(commitTxn, commitStmt) + if err != nil { + return err + } p.Logger.Debugf("Finished processing: %s", insight.RepoURLSource) - return err + return nil }