diff --git a/client/pkg/clickhouse/db_client.go b/client/pkg/clickhouse/db_client.go index 95a99971..600ba320 100644 --- a/client/pkg/clickhouse/db_client.go +++ b/client/pkg/clickhouse/db_client.go @@ -37,7 +37,7 @@ type DBInterface interface { RetrieveKubvizEvent() ([]model.DbEvent, error) InsertContainerEventDockerHub(model.DockerHubBuild) InsertContainerEventGithub(string) - InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) + InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) error Close() } @@ -419,11 +419,15 @@ func (c *DBClient) InsertContainerEventGithub(event string) { } } -func (c *DBClient) InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) { - var ( - tx, _ = c.conn.Begin() - stmt, _ = tx.Prepare(string(statement)) - ) +func (c *DBClient) InsertGitCommon(metrics model.GitCommonAttribute, statement dbstatement.DBStatement) error { + tx, err := c.conn.Begin() + if err != nil { + return err + } + stmt, err := tx.Prepare(string(statement)) + if err != nil { + return err + } defer stmt.Close() if _, err := stmt.Exec( metrics.Author, @@ -435,9 +439,10 @@ func (c *DBClient) InsertGitCommon(metrics model.GitCommonAttribute, statement d metrics.TimeStamp, metrics.Event, ); err != nil { - log.Fatal(err) + return err } if err := tx.Commit(); err != nil { - log.Fatal(err) + return err } + return nil } diff --git a/client/pkg/clients/bridge_client.go b/client/pkg/clients/bridge_client.go index 924f1330..9fb3d650 100644 --- a/client/pkg/clients/bridge_client.go +++ b/client/pkg/clients/bridge_client.go @@ -35,24 +35,16 @@ const ( // SubscribeGitBridgeNats subscribes to nats jetstream and calls // the respective funcs to insert data into clickhouse DB func (n *NATSContext) SubscribeGitBridgeNats(conn clickhouse.DBInterface) { - var sub *nats.Subscription - sub, _ = n.stream.Subscribe(string(bridgeSubject), func(msg *nats.Msg) { - // Recover from a panic - defer func() { - if r := recover(); r != nil { - log.Println("Recovered from panic:", r) - - // Acknowledge all messages - for { - msg, err := sub.NextMsg(0) - if err != nil { - break - } - msg.Ack() - } - } - }() + n.stream.Subscribe(string(bridgeSubject), func(msg *nats.Msg) { + // Recover from a panic + defer func() { + if r := recover(); r != nil { + log.Println("Recovered from panic:", r) + // Acknowledge the message + msg.Ack() + } + }() msg.Ack() gitprovider := msg.Header.Get("GitProvider") repo := model.GitProvider(gitprovider) @@ -104,7 +96,10 @@ func (n *NATSContext) SubscribeGitBridgeNats(conn clickhouse.DBInterface) { gca.RepoName = pl.Resource.Repository.Name gca.TimeStamp = time.Now().Format(time.DateTime) gca.Event = string(msg.Data) - conn.InsertGitCommon(gca, dbstatement.InsertAzureDevops) + if err := conn.InsertGitCommon(gca, dbstatement.InsertAzureDevops); err != nil { + log.Println("error occurred while inserting data into database:", err.Error()) + return + } log.Println("Inserted AzureDevops metrics:", string(msg.Data)) log.Println() case string(azuremodel.GitPullRequestMergedEventType):