Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upstream test #497

Merged
merged 10 commits into from
Aug 21, 2023
9 changes: 6 additions & 3 deletions events/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func (e EventConsumer) Validate() error {
return fmt.Errorf("BatchSize:%d <= 0", e.BatchSize)
}
if e.Consumers <= 0 {
return fmt.Errorf("Consumers:%d <= 0", e.BatchSize)
return fmt.Errorf("consumers:%d <= 0", e.BatchSize)
}
if len(e.WatchEvents) == 0 {
return fmt.Errorf("No events registered to watch:%d <= 0", len(e.WatchEvents))
return fmt.Errorf("no events registered to watch:%d <= 0", len(e.WatchEvents))
}
return nil
}
Expand All @@ -51,7 +51,8 @@ func (t *EventConsumer) consumeEvents() error {
SELECT id FROM event_queue
WHERE
attempts <= @maxAttempts AND
name IN @events
name IN @events AND
(last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @baseDelay * POWER(attempts, @exponential))
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT @batchSize
Expand All @@ -64,6 +65,8 @@ func (t *EventConsumer) consumeEvents() error {
"maxAttempts": eventMaxAttempts,
"events": t.WatchEvents,
"batchSize": t.BatchSize,
"baseDelay": 60, // in seconds
"exponential": 5, // along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes)
}
err := tx.Raw(selectEventsQuery, vals).Scan(&events).Error
if err != nil {
Expand Down
62 changes: 49 additions & 13 deletions events/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/fixtures/dummy"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/testutils"
"github.com/flanksource/incident-commander/api"
"github.com/google/uuid"

"github.com/flanksource/incident-commander/upstream"
"github.com/jackc/pgx/v5/pgxpool"
Expand All @@ -27,6 +29,30 @@ func TestPushMode(t *testing.T) {
ginkgo.RunSpecs(t, "Push Mode Suite")
}

type agentWrapper struct {
id uuid.UUID // agent's id in the upstream db
name string
db *gorm.DB
pool *pgxpool.Pool
dataset dummy.DummyData
}

func (t *agentWrapper) setup(connection string) {
var err error

if t.db, t.pool, err = duty.SetupDB(connection, nil); err != nil {
ginkgo.Fail(err.Error())
}

if err := t.dataset.Populate(t.db); err != nil {
ginkgo.Fail(err.Error())
}
}

func (t *agentWrapper) stop() {
t.pool.Close()
}

var (
// postgres server shared by both agent and upstream
postgresServer *embeddedPG.EmbeddedPostgres
Expand All @@ -35,39 +61,46 @@ var (
upstreamEchoServerport = 11005
upstreamEchoServer *echo.Echo

agentDB *gorm.DB
agentDBPGPool *pgxpool.Pool
agentDBName = "agent"
agentBob = agentWrapper{name: "bob", id: uuid.New(), dataset: dummy.GenerateDynamicDummyData()}
agentJames = agentWrapper{name: "james", id: uuid.New(), dataset: dummy.GenerateDynamicDummyData()}
agentRoss = agentWrapper{name: "ross", id: uuid.New(), dataset: dummy.GenerateDynamicDummyData()}

upstreamDB *gorm.DB
upstreamDBPGPool *pgxpool.Pool
upstreamDBName = "upstream"
)

var _ = ginkgo.BeforeSuite(func() {
config, connection := testutils.GetEmbeddedPGConfig(agentDBName, pgServerPort)
config, connection := testutils.GetEmbeddedPGConfig(agentBob.name, pgServerPort)
postgresServer = embeddedPG.NewDatabase(config)
if err := postgresServer.Start(); err != nil {
ginkgo.Fail(err.Error())
}
logger.Infof("Started postgres on port: %d", pgServerPort)

var err error
if agentDB, agentDBPGPool, err = duty.SetupDB(connection, nil); err != nil {
ginkgo.Fail(err.Error())
}
if err := dummy.PopulateDBWithDummyModels(agentDB); err != nil {
ginkgo.Fail(err.Error())
}
agentBob.setup(connection)

_, err = agentDBPGPool.Exec(context.TODO(), fmt.Sprintf("CREATE DATABASE %s", upstreamDBName))
// Setup another agent
_, err := agentBob.pool.Exec(context.TODO(), fmt.Sprintf("CREATE DATABASE %s", agentJames.name))
Expect(err).NotTo(HaveOccurred())
agentJames.setup(strings.ReplaceAll(connection, agentBob.name, agentJames.name))

upstreamDBConnection := strings.ReplaceAll(connection, agentDBName, upstreamDBName)
_, err = agentBob.pool.Exec(context.TODO(), fmt.Sprintf("CREATE DATABASE %s", agentRoss.name))
Expect(err).NotTo(HaveOccurred())
agentRoss.setup(strings.ReplaceAll(connection, agentBob.name, agentRoss.name))

// Setup upstream db
_, err = agentBob.pool.Exec(context.TODO(), fmt.Sprintf("CREATE DATABASE %s", upstreamDBName))
Expect(err).NotTo(HaveOccurred())
upstreamDBConnection := strings.ReplaceAll(connection, agentBob.name, upstreamDBName)
if upstreamDB, upstreamDBPGPool, err = duty.SetupDB(upstreamDBConnection, nil); err != nil {
ginkgo.Fail(err.Error())
}

Expect(upstreamDB.Create(&models.Agent{ID: agentBob.id, Name: agentBob.name}).Error).To(BeNil())
Expect(upstreamDB.Create(&models.Agent{ID: agentJames.id, Name: agentJames.name}).Error).To(BeNil())
Expect(upstreamDB.Create(&models.Agent{ID: agentRoss.id, Name: agentRoss.name}).Error).To(BeNil())

upstreamEchoServer = echo.New()
upstreamEchoServer.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
Expand All @@ -91,6 +124,9 @@ var _ = ginkgo.BeforeSuite(func() {
})

var _ = ginkgo.AfterSuite(func() {
agentBob.stop()
agentJames.stop()

logger.Infof("Stopping upstream echo server")
if err := upstreamEchoServer.Shutdown(context.Background()); err != nil {
ginkgo.Fail(err.Error())
Expand Down
2 changes: 1 addition & 1 deletion events/upstream_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func handleUpstreamPushEvents(ctx *api.Context, events []api.Event) []api.Event
var eventsToProcess []api.Event
for _, e := range events {
if e.Name != EventPushQueueCreate {
e.Error = fmt.Errorf("Unrecognized event name: %s", e.Name).Error()
e.Error = fmt.Errorf("unrecognized event name: %s", e.Name).Error()
failedEvents = append(failedEvents, e)
} else {
eventsToProcess = append(eventsToProcess, e)
Expand Down
Loading
Loading