diff --git a/go.mod b/go.mod index 2ac5293..f7a7d99 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/go-git/go-git/v5 v5.6.1 + github.com/google/uuid v1.3.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.9 go.uber.org/zap v1.24.0 diff --git a/go.sum b/go.sum index c6820d7..cdb5eef 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/go-git/go-git/v5 v5.6.1 h1:q4ZRqQl4pR/ZJHc1L5CFjGA1a10u76aV1iC+nh+bHs github.com/go-git/go-git/v5 v5.6.1/go.mod h1:mvyoL6Unz0PiTQrGQfSfiLFhBH1c1e84ylC2MDs4ee8= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= diff --git a/pkg/database/handler.go b/pkg/database/handler.go index 75d7b91..86f56e4 100644 --- a/pkg/database/handler.go +++ b/pkg/database/handler.go @@ -10,7 +10,7 @@ import ( "time" // the injected postgres interface implementations for Go SQL - _ "github.com/lib/pq" + "github.com/lib/pq" "github.com/open-sauced/pizza/oven/pkg/insights" ) @@ -65,23 +65,124 @@ func (p PizzaOvenDbHandler) GetAuthorID(insight insights.CommitInsight) (int, er return id, err } -// InsertAuthor inserts an author by their email -func (p PizzaOvenDbHandler) InsertAuthor(insight insights.CommitInsight) (int, error) { - var id int - err := p.db.QueryRow("INSERT INTO public.commit_authors(commit_author_email) VALUES($1) RETURNING id", insight.AuthorEmail).Scan(&id) - return id, err +// GetAuthorIDs queries the id of an author by their email +func (p PizzaOvenDbHandler) GetAuthorIDs(emails []string) (map[string]int, error) { + emailIDMap := make(map[string]int) + + rows, err := p.db.Query("SELECT id, commit_author_email FROM commit_authors WHERE commit_author_email = ANY($1);", pq.Array(emails)) + if err != nil { + log.Fatal(err) + } + defer rows.Close() + for rows.Next() { + var id int + var email string + if err := rows.Scan(&id, &email); err != nil { + log.Fatal(err) + } + emailIDMap[email] = id + } + + return emailIDMap, nil } -// GetCommitID queries the id of a given commit based on its hash -func (p PizzaOvenDbHandler) GetCommitID(repoID int, insight insights.CommitInsight) (int, error) { - var id int - err := p.db.QueryRow("SELECT id FROM public.commits WHERE baked_repo_id=$1 AND commit_hash=$2", repoID, insight.Hash).Scan(&id) - return id, err +// PrepareBulkAuthorInsert creates a temporary table that mirrors the commit_authors +// and is used to perform a bulk insert "pivot" which accounts for conflicts +func (p PizzaOvenDbHandler) PrepareBulkAuthorInsert(tmpTableName string) (*sql.Tx, *sql.Stmt, error) { + _, err := p.db.Exec(fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM commit_authors WHERE 1=0", tmpTableName)) + if err != nil { + return nil, nil, err + } + + txn, err := p.db.Begin() + if err != nil { + return nil, nil, err + } + + stmt, err := txn.Prepare(pq.CopyIn(tmpTableName, "commit_author_email")) + if err != nil { + newErr := txn.Rollback() + if newErr != nil { + return nil, nil, fmt.Errorf("could not abort the sql transaction: %s - original error: %s", newErr, err) + } + + return nil, nil, err + } + + return txn, stmt, nil +} + +// PivotTmpTableToAuthorsTable performs the pivot from the temporary commit authors +// table to the real one handling any conflicts +func (p PizzaOvenDbHandler) PivotTmpTableToAuthorsTable(tmpTableName string) error { + _, err := p.db.Exec(fmt.Sprintf(` + INSERT INTO public.commit_authors(commit_author_email) + SELECT commit_author_email FROM %s + ON CONFLICT (commit_author_email) + DO NOTHING + `, tmpTableName)) + if err != nil { + return err + } + + _, err = p.db.Exec(fmt.Sprintf("DROP TABLE %s", tmpTableName)) + if err != nil { + return err + } + + return nil +} + +// InsertAuthor inserts an author by their email into the sql transaction +func (p PizzaOvenDbHandler) InsertAuthor(stmt *sql.Stmt, insight insights.CommitInsight) error { + _, err := stmt.Exec(insight.AuthorEmail) + return err +} + +// PrepareBulkCommitInsert gets a sql bulk transaction ready to insert all commits +// from processing in one round trip +func (p PizzaOvenDbHandler) PrepareBulkCommitInsert() (*sql.Tx, *sql.Stmt, error) { + txn, err := p.db.Begin() + if err != nil { + return nil, nil, err + } + + stmt, err := txn.Prepare(pq.CopyIn("commits", "commit_hash", "commit_author_id", "baked_repo_id", "commit_date")) + if err != nil { + newErr := txn.Rollback() + if newErr != nil { + return nil, nil, fmt.Errorf("could not abort commits bulk sql transaction: %s - original error: %s", newErr, err) + } + + return nil, nil, err + } + + return txn, stmt, nil +} + +// ResolveTransaction resolves a given transaction and sql statement +func (p PizzaOvenDbHandler) ResolveTransaction(txn *sql.Tx, stmt *sql.Stmt) error { + _, err := stmt.Exec() + if err != nil { + return err + } + + err = stmt.Close() + if err != nil { + return err + } + + err = txn.Commit() + if err != nil { + return err + } + + return nil } -// InsertCommit inserts a commit based on its commit hash -func (p PizzaOvenDbHandler) InsertCommit(insight insights.CommitInsight, authorID int, repoID int) error { - _, err := p.db.Exec("INSERT INTO public.commits(commit_hash, commit_author_id, baked_repo_id, commit_date) VALUES($1, $2, $3, $4)", insight.Hash, authorID, repoID, insight.Date) +// InsertCommit adds a commit to the given sql.Stmt to be executed in bulk +func (p PizzaOvenDbHandler) InsertCommit(stmt *sql.Stmt, insight insights.CommitInsight, authorID int, repoID int) error { + _, err := stmt.Exec(insight.Hash, authorID, repoID, insight.Date) return err } diff --git a/pkg/server/server.go b/pkg/server/server.go index d5ff703..551ec52 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -8,11 +8,14 @@ import ( "fmt" "log" "net/http" + "strings" + "sync/atomic" "time" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing/object" "github.com/go-git/go-git/v5/plumbing/transport" + "github.com/google/uuid" "go.uber.org/zap" "github.com/open-sauced/pizza/oven/pkg/common" @@ -21,6 +24,10 @@ import ( "github.com/open-sauced/pizza/oven/pkg/providers" ) +// counter is a atomic counter that is used to create canonical, short lived +// temporary table names for bulk inserts of commit authors +var counter int64 + // Config provides the configuration set on server startup // - Never Evict Repos: Repos that are preserved in cache regardless of LRU policy type Config struct { @@ -178,6 +185,10 @@ func (p PizzaOvenServer) processRepository(repoURL string) error { return err } + // Add 1 nanosecond since "git log --since" is inclusive of date/times. + // Although date/times are not unique to commits, it is incredibly unlikely that + // two commits will have the exact same timestamp and be excluded using this method + latestCommitDate = latestCommitDate.Add(time.Nanosecond) p.Logger.Debugf("Querying commits since: %s", latestCommitDate.String()) // Git shortlog options to display summary and email starting at HEAD @@ -187,54 +198,115 @@ func (p PizzaOvenServer) processRepository(repoURL string) error { } p.Logger.Debugf("Getting commit iterator with git log options: %v", gitLogOptions) - commitIter, err := gitRepo.Log(&gitLogOptions) + authorIter, err := gitRepo.Log(&gitLogOptions) if err != nil { return err } - p.Logger.Debugf("Iterating commits in repository: %s", insight.RepoURLSource) - err = commitIter.ForEach(func(c *object.Commit) error { + // Build a unique, atomically safe temporary table name to pivot commit + // author data from + rawUUID := uuid.New().String() + uuid := strings.ReplaceAll(rawUUID, "-", "") + tmpTableName := fmt.Sprintf("temp_table_%s_%d", uuid, atomic.AddInt64(&counter, 1)) + + p.Logger.Debugf("Using temporary db table for commit authors: %s", tmpTableName) + authorTxn, authorStmt, err := p.PizzaOven.PrepareBulkAuthorInsert(tmpTableName) + if err != nil { + return err + } + + // To reduce unnecessary duplicate statement executions, track the unique + // author emails using a simple set (represented as a string map to structs) + uniqueAuthorEmails := []string{} + authorEmailSet := make(map[string]struct{}) + + p.Logger.Debugf("Iterating commit authors in repository: %s with temporary tablename: %s", insight.RepoURLSource, tmpTableName) + err = authorIter.ForEach(func(c *object.Commit) error { // TODO - if the committer and author are not the same, handle both // those users. This is the case where there is a separate committer for // a patch that was not authored by that specific person making the commit. // TODO - if there is a co-author, should handle adding that person on // the commit as well. - insight.AuthorEmail = c.Author.Email - insight.Hash = c.Hash.String() - insight.Date = c.Committer.When.UTC() - p.Logger.Debugf("Inspecting commit: %s %s %s", insight.AuthorEmail, insight.Hash, insight.Date) - authorID, err := p.PizzaOven.GetAuthorID(insight) - if err != nil { - if err == sql.ErrNoRows { - p.Logger.Debugf("Author not found. Inserting: %s %s %s", insight.AuthorEmail, insight.Hash, insight.Date) - authorID, err = p.PizzaOven.InsertAuthor(insight) - if err != nil { - return err - } - } else { - return err - } + // Check if the author email is in the unique set of author emails + if _, ok := authorEmailSet[c.Author.Email]; ok { + return nil } - p.Logger.Debugf("Checking if commit already in database: %s", insight.Hash) - _, err = p.PizzaOven.GetCommitID(repoID, insight) + // Commit author is not in set so add this author's email as unique + authorEmailSet[c.Author.Email] = struct{}{} + uniqueAuthorEmails = append(uniqueAuthorEmails, c.Author.Email) + + p.Logger.Debugf("Inspecting commit author: %s", c.Author.Email) + return p.PizzaOven.InsertAuthor(authorStmt, insights.CommitInsight{ + RepoURLSource: repoURL, + AuthorEmail: c.Author.Email, + Hash: "", + Date: time.Time{}, + }) + }) + if err != nil { + return err + } + + // Resolve, execute, and pivot the bulk author transaction + err = p.PizzaOven.ResolveTransaction(authorTxn, authorStmt) + if err != nil { + return err + } + + err = p.PizzaOven.PivotTmpTableToAuthorsTable(tmpTableName) + if err != nil { + return err + } + + // Re-query the database for author email ids based on the unique list of + // author emails that have just been committed + authorEmailIDMap, err := p.PizzaOven.GetAuthorIDs(uniqueAuthorEmails) + if err != nil { + return err + } + + // Rebuild the iterator from the start using the same options + commitIter, err := gitRepo.Log(&gitLogOptions) + if err != nil { + return err + } + + // Get ready for the commit bulk action + commitTxn, commitStmt, err := p.PizzaOven.PrepareBulkCommitInsert() + if err != nil { + return err + } + + p.Logger.Debugf("Iterating commits in repository: %s", insight.RepoURLSource) + err = commitIter.ForEach(func(c *object.Commit) error { + i := insights.CommitInsight{ + RepoURLSource: repoURL, + AuthorEmail: c.Author.Email, + Hash: c.Hash.String(), + Date: c.Committer.When.UTC(), + } + + p.Logger.Debugf("Inspecting commit: %s %s %s", i.AuthorEmail, i.Hash, i.Date) + err = p.PizzaOven.InsertCommit(commitStmt, i, authorEmailIDMap[i.AuthorEmail], repoID) if err != nil { - if err == sql.ErrNoRows { - p.Logger.Debugf("Commit not found. Inserting into database: %s", insight.Hash) - err = p.PizzaOven.InsertCommit(insight, authorID, repoID) - if err != nil { - return err - } - } else { - return err - } + return err } return nil }) + if err != nil { + return err + } + + // Execute and resolve the bulk commit insert + err = p.PizzaOven.ResolveTransaction(commitTxn, commitStmt) + if err != nil { + return err + } p.Logger.Debugf("Finished processing: %s", insight.RepoURLSource) - return err + return nil }