Skip to content

Commit

Permalink
feat: Pizza oven database optimizations (#39)
Browse files Browse the repository at this point in the history
This optimizes the database calls within the pizza oven. Instead
of iterating and inserting each author and commit, one after another,
this uses methods to bulk insert both commit authors and commits.

Signed-off-by: John McBride <john@opensauced.pizza>
  • Loading branch information
jpmcb authored Sep 1, 2023
1 parent d86099f commit 80d7f88
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 44 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/go-git/go-git/v5 v5.6.1
github.com/google/uuid v1.3.1
github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.9
go.uber.org/zap v1.24.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/go-git/go-git/v5 v5.6.1 h1:q4ZRqQl4pR/ZJHc1L5CFjGA1a10u76aV1iC+nh+bHs
github.com/go-git/go-git/v5 v5.6.1/go.mod h1:mvyoL6Unz0PiTQrGQfSfiLFhBH1c1e84ylC2MDs4ee8=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
Expand Down
129 changes: 115 additions & 14 deletions pkg/database/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

// the injected postgres interface implementations for Go SQL
_ "github.com/lib/pq"
"github.com/lib/pq"

"github.com/open-sauced/pizza/oven/pkg/insights"
)
Expand Down Expand Up @@ -65,23 +65,124 @@ func (p PizzaOvenDbHandler) GetAuthorID(insight insights.CommitInsight) (int, er
return id, err
}

// InsertAuthor inserts an author by their email
func (p PizzaOvenDbHandler) InsertAuthor(insight insights.CommitInsight) (int, error) {
var id int
err := p.db.QueryRow("INSERT INTO public.commit_authors(commit_author_email) VALUES($1) RETURNING id", insight.AuthorEmail).Scan(&id)
return id, err
// GetAuthorIDs queries the id of an author by their email
func (p PizzaOvenDbHandler) GetAuthorIDs(emails []string) (map[string]int, error) {
emailIDMap := make(map[string]int)

rows, err := p.db.Query("SELECT id, commit_author_email FROM commit_authors WHERE commit_author_email = ANY($1);", pq.Array(emails))
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var id int
var email string
if err := rows.Scan(&id, &email); err != nil {
log.Fatal(err)
}
emailIDMap[email] = id
}

return emailIDMap, nil
}

// GetCommitID queries the id of a given commit based on its hash
func (p PizzaOvenDbHandler) GetCommitID(repoID int, insight insights.CommitInsight) (int, error) {
var id int
err := p.db.QueryRow("SELECT id FROM public.commits WHERE baked_repo_id=$1 AND commit_hash=$2", repoID, insight.Hash).Scan(&id)
return id, err
// PrepareBulkAuthorInsert creates a temporary table that mirrors the commit_authors
// and is used to perform a bulk insert "pivot" which accounts for conflicts
func (p PizzaOvenDbHandler) PrepareBulkAuthorInsert(tmpTableName string) (*sql.Tx, *sql.Stmt, error) {
_, err := p.db.Exec(fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM commit_authors WHERE 1=0", tmpTableName))
if err != nil {
return nil, nil, err
}

txn, err := p.db.Begin()
if err != nil {
return nil, nil, err
}

stmt, err := txn.Prepare(pq.CopyIn(tmpTableName, "commit_author_email"))
if err != nil {
newErr := txn.Rollback()
if newErr != nil {
return nil, nil, fmt.Errorf("could not abort the sql transaction: %s - original error: %s", newErr, err)
}

return nil, nil, err
}

return txn, stmt, nil
}

// PivotTmpTableToAuthorsTable performs the pivot from the temporary commit authors
// table to the real one handling any conflicts
func (p PizzaOvenDbHandler) PivotTmpTableToAuthorsTable(tmpTableName string) error {
_, err := p.db.Exec(fmt.Sprintf(`
INSERT INTO public.commit_authors(commit_author_email)
SELECT commit_author_email FROM %s
ON CONFLICT (commit_author_email)
DO NOTHING
`, tmpTableName))
if err != nil {
return err
}

_, err = p.db.Exec(fmt.Sprintf("DROP TABLE %s", tmpTableName))
if err != nil {
return err
}

return nil
}

// InsertAuthor inserts an author by their email into the sql transaction
func (p PizzaOvenDbHandler) InsertAuthor(stmt *sql.Stmt, insight insights.CommitInsight) error {
_, err := stmt.Exec(insight.AuthorEmail)
return err
}

// PrepareBulkCommitInsert gets a sql bulk transaction ready to insert all commits
// from processing in one round trip
func (p PizzaOvenDbHandler) PrepareBulkCommitInsert() (*sql.Tx, *sql.Stmt, error) {
txn, err := p.db.Begin()
if err != nil {
return nil, nil, err
}

stmt, err := txn.Prepare(pq.CopyIn("commits", "commit_hash", "commit_author_id", "baked_repo_id", "commit_date"))
if err != nil {
newErr := txn.Rollback()
if newErr != nil {
return nil, nil, fmt.Errorf("could not abort commits bulk sql transaction: %s - original error: %s", newErr, err)
}

return nil, nil, err
}

return txn, stmt, nil
}

// ResolveTransaction resolves a given transaction and sql statement
func (p PizzaOvenDbHandler) ResolveTransaction(txn *sql.Tx, stmt *sql.Stmt) error {
_, err := stmt.Exec()
if err != nil {
return err
}

err = stmt.Close()
if err != nil {
return err
}

err = txn.Commit()
if err != nil {
return err
}

return nil
}

// InsertCommit inserts a commit based on its commit hash
func (p PizzaOvenDbHandler) InsertCommit(insight insights.CommitInsight, authorID int, repoID int) error {
_, err := p.db.Exec("INSERT INTO public.commits(commit_hash, commit_author_id, baked_repo_id, commit_date) VALUES($1, $2, $3, $4)", insight.Hash, authorID, repoID, insight.Date)
// InsertCommit adds a commit to the given sql.Stmt to be executed in bulk
func (p PizzaOvenDbHandler) InsertCommit(stmt *sql.Stmt, insight insights.CommitInsight, authorID int, repoID int) error {
_, err := stmt.Exec(insight.Hash, authorID, repoID, insight.Date)
return err
}

Expand Down
132 changes: 102 additions & 30 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"fmt"
"log"
"net/http"
"strings"
"sync/atomic"
"time"

"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/google/uuid"
"go.uber.org/zap"

"github.com/open-sauced/pizza/oven/pkg/common"
Expand All @@ -21,6 +24,10 @@ import (
"github.com/open-sauced/pizza/oven/pkg/providers"
)

// counter is a atomic counter that is used to create canonical, short lived
// temporary table names for bulk inserts of commit authors
var counter int64

// Config provides the configuration set on server startup
// - Never Evict Repos: Repos that are preserved in cache regardless of LRU policy
type Config struct {
Expand Down Expand Up @@ -178,6 +185,10 @@ func (p PizzaOvenServer) processRepository(repoURL string) error {
return err
}

// Add 1 nanosecond since "git log --since" is inclusive of date/times.
// Although date/times are not unique to commits, it is incredibly unlikely that
// two commits will have the exact same timestamp and be excluded using this method
latestCommitDate = latestCommitDate.Add(time.Nanosecond)
p.Logger.Debugf("Querying commits since: %s", latestCommitDate.String())

// Git shortlog options to display summary and email starting at HEAD
Expand All @@ -187,54 +198,115 @@ func (p PizzaOvenServer) processRepository(repoURL string) error {
}

p.Logger.Debugf("Getting commit iterator with git log options: %v", gitLogOptions)
commitIter, err := gitRepo.Log(&gitLogOptions)
authorIter, err := gitRepo.Log(&gitLogOptions)
if err != nil {
return err
}

p.Logger.Debugf("Iterating commits in repository: %s", insight.RepoURLSource)
err = commitIter.ForEach(func(c *object.Commit) error {
// Build a unique, atomically safe temporary table name to pivot commit
// author data from
rawUUID := uuid.New().String()
uuid := strings.ReplaceAll(rawUUID, "-", "")
tmpTableName := fmt.Sprintf("temp_table_%s_%d", uuid, atomic.AddInt64(&counter, 1))

p.Logger.Debugf("Using temporary db table for commit authors: %s", tmpTableName)
authorTxn, authorStmt, err := p.PizzaOven.PrepareBulkAuthorInsert(tmpTableName)
if err != nil {
return err
}

// To reduce unnecessary duplicate statement executions, track the unique
// author emails using a simple set (represented as a string map to structs)
uniqueAuthorEmails := []string{}
authorEmailSet := make(map[string]struct{})

p.Logger.Debugf("Iterating commit authors in repository: %s with temporary tablename: %s", insight.RepoURLSource, tmpTableName)
err = authorIter.ForEach(func(c *object.Commit) error {
// TODO - if the committer and author are not the same, handle both
// those users. This is the case where there is a separate committer for
// a patch that was not authored by that specific person making the commit.

// TODO - if there is a co-author, should handle adding that person on
// the commit as well.
insight.AuthorEmail = c.Author.Email
insight.Hash = c.Hash.String()
insight.Date = c.Committer.When.UTC()

p.Logger.Debugf("Inspecting commit: %s %s %s", insight.AuthorEmail, insight.Hash, insight.Date)
authorID, err := p.PizzaOven.GetAuthorID(insight)
if err != nil {
if err == sql.ErrNoRows {
p.Logger.Debugf("Author not found. Inserting: %s %s %s", insight.AuthorEmail, insight.Hash, insight.Date)
authorID, err = p.PizzaOven.InsertAuthor(insight)
if err != nil {
return err
}
} else {
return err
}
// Check if the author email is in the unique set of author emails
if _, ok := authorEmailSet[c.Author.Email]; ok {
return nil
}

p.Logger.Debugf("Checking if commit already in database: %s", insight.Hash)
_, err = p.PizzaOven.GetCommitID(repoID, insight)
// Commit author is not in set so add this author's email as unique
authorEmailSet[c.Author.Email] = struct{}{}
uniqueAuthorEmails = append(uniqueAuthorEmails, c.Author.Email)

p.Logger.Debugf("Inspecting commit author: %s", c.Author.Email)
return p.PizzaOven.InsertAuthor(authorStmt, insights.CommitInsight{
RepoURLSource: repoURL,
AuthorEmail: c.Author.Email,
Hash: "",
Date: time.Time{},
})
})
if err != nil {
return err
}

// Resolve, execute, and pivot the bulk author transaction
err = p.PizzaOven.ResolveTransaction(authorTxn, authorStmt)
if err != nil {
return err
}

err = p.PizzaOven.PivotTmpTableToAuthorsTable(tmpTableName)
if err != nil {
return err
}

// Re-query the database for author email ids based on the unique list of
// author emails that have just been committed
authorEmailIDMap, err := p.PizzaOven.GetAuthorIDs(uniqueAuthorEmails)
if err != nil {
return err
}

// Rebuild the iterator from the start using the same options
commitIter, err := gitRepo.Log(&gitLogOptions)
if err != nil {
return err
}

// Get ready for the commit bulk action
commitTxn, commitStmt, err := p.PizzaOven.PrepareBulkCommitInsert()
if err != nil {
return err
}

p.Logger.Debugf("Iterating commits in repository: %s", insight.RepoURLSource)
err = commitIter.ForEach(func(c *object.Commit) error {
i := insights.CommitInsight{
RepoURLSource: repoURL,
AuthorEmail: c.Author.Email,
Hash: c.Hash.String(),
Date: c.Committer.When.UTC(),
}

p.Logger.Debugf("Inspecting commit: %s %s %s", i.AuthorEmail, i.Hash, i.Date)
err = p.PizzaOven.InsertCommit(commitStmt, i, authorEmailIDMap[i.AuthorEmail], repoID)
if err != nil {
if err == sql.ErrNoRows {
p.Logger.Debugf("Commit not found. Inserting into database: %s", insight.Hash)
err = p.PizzaOven.InsertCommit(insight, authorID, repoID)
if err != nil {
return err
}
} else {
return err
}
return err
}

return nil
})
if err != nil {
return err
}

// Execute and resolve the bulk commit insert
err = p.PizzaOven.ResolveTransaction(commitTxn, commitStmt)
if err != nil {
return err
}

p.Logger.Debugf("Finished processing: %s", insight.RepoURLSource)
return err
return nil
}

0 comments on commit 80d7f88

Please sign in to comment.