Skip to content

Commit

Permalink
Merge pull request #115 from vijeyash1/crashfix
Browse files Browse the repository at this point in the history
crashfix
  • Loading branch information
jebinjeb authored Jul 27, 2023
2 parents 5257e6f + e1616f4 commit c994634
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
21 changes: 13 additions & 8 deletions client/pkg/clickhouse/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type DBInterface interface {
RetrieveKubvizEvent() ([]model.DbEvent, error)
InsertContainerEventDockerHub(model.DockerHubBuild)
InsertContainerEventGithub(string)
InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement)
InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) error
Close()
}

Expand Down Expand Up @@ -419,11 +419,15 @@ func (c *DBClient) InsertContainerEventGithub(event string) {
}
}

func (c *DBClient) InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) {
var (
tx, _ = c.conn.Begin()
stmt, _ = tx.Prepare(string(statement))
)
func (c *DBClient) InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) error {
tx, err := c.conn.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(string(statement))
if err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(
metrics.Author,
Expand All @@ -435,9 +439,10 @@ func (c *DBClient) InsertGitCommon(metrics model.GitCommonAttribute, statement d
metrics.TimeStamp,
metrics.Event,
); err != nil {
log.Fatal(err)
return err
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
return err
}
return nil
}
31 changes: 13 additions & 18 deletions client/pkg/clients/bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,16 @@ const (
// SubscribeGitBridgeNats subscribes to nats jetstream and calls
// the respective funcs to insert data into clickhouse DB
func (n *NATSContext) SubscribeGitBridgeNats(conn clickhouse.DBInterface) {
var sub *nats.Subscription
sub, _ = n.stream.Subscribe(string(bridgeSubject), func(msg *nats.Msg) {
// Recover from a panic
defer func() {
if r := recover(); r != nil {
log.Println("Recovered from panic:", r)

// Acknowledge all messages
for {
msg, err := sub.NextMsg(0)
if err != nil {
break
}
msg.Ack()
}
}
}()
n.stream.Subscribe(string(bridgeSubject), func(msg *nats.Msg) {
// Recover from a panic
defer func() {
if r := recover(); r != nil {
log.Println("Recovered from panic:", r)

// Acknowledge the message
msg.Ack()
}
}()
msg.Ack()
gitprovider := msg.Header.Get("GitProvider")
repo := model.GitProvider(gitprovider)
Expand Down Expand Up @@ -104,7 +96,10 @@ func (n *NATSContext) SubscribeGitBridgeNats(conn clickhouse.DBInterface) {
gca.RepoName = pl.Resource.Repository.Name
gca.TimeStamp = time.Now().Format(time.DateTime)
gca.Event = string(msg.Data)
conn.InsertGitCommon(gca, dbstatement.InsertAzureDevops)
if err := conn.InsertGitCommon(gca, dbstatement.InsertAzureDevops); err != nil {
log.Println("error occurred while inserting data into database:", err.Error())
return
}
log.Println("Inserted AzureDevops metrics:", string(msg.Data))
log.Println()
case string(azuremodel.GitPullRequestMergedEventType):
Expand Down

0 comments on commit c994634

Please sign in to comment.