Skip to content

Commit

Permalink
Record event delivery latency (#1830)
Browse files Browse the repository at this point in the history
* fix: record event delivery latency

* fix: add latecny field to EventDeliveryPaginated

* fix: add if exists clause
  • Loading branch information
danvixent authored Nov 1, 2023
1 parent 27ef2d3 commit 49aac3d
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 4 deletions.
10 changes: 7 additions & 3 deletions database/postgres/event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/frain-dev/convoy/cache"
"time"

"github.com/frain-dev/convoy/cache"

"github.com/lib/pq"

"github.com/frain-dev/convoy/database"
Expand Down Expand Up @@ -55,6 +56,7 @@ const (
COALESCE(ep.target_url, '') AS "endpoint_metadata.target_url",
ev.id AS "event_metadata.id",
ev.event_type AS "event_metadata.event_type",
COALESCE(ed.latency,'') AS latency,
COALESCE(d.id,'') AS "device_metadata.id",
COALESCE(d.status,'') AS "device_metadata.status",
Expand Down Expand Up @@ -166,7 +168,7 @@ const (
`

updateEventDeliveryAttempts = `
UPDATE convoy.event_deliveries SET attempts = $1, status = $2, metadata = $3, updated_at = NOW() WHERE id = $4 AND project_id = $5 AND deleted_at IS NULL;
UPDATE convoy.event_deliveries SET attempts = $1, status = $2, metadata = $3, latency = $4, updated_at = NOW() WHERE id = $5 AND project_id = $6 AND deleted_at IS NULL;
`

softDeleteProjectEventDeliveries = `
Expand Down Expand Up @@ -373,7 +375,7 @@ func (e *eventDeliveryRepo) FindDiscardedEventDeliveries(ctx context.Context, pr
func (e *eventDeliveryRepo) UpdateEventDeliveryWithAttempt(ctx context.Context, projectID string, delivery datastore.EventDelivery, attempt datastore.DeliveryAttempt) error {
delivery.DeliveryAttempts = append(delivery.DeliveryAttempts, attempt)

result, err := e.db.ExecContext(ctx, updateEventDeliveryAttempts, delivery.DeliveryAttempts, delivery.Status, delivery.Metadata, delivery.UID, projectID)
result, err := e.db.ExecContext(ctx, updateEventDeliveryAttempts, delivery.DeliveryAttempts, delivery.Status, delivery.Metadata, delivery.Latency, delivery.UID, projectID)
if err != nil {
return err
}
Expand Down Expand Up @@ -551,6 +553,7 @@ func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projec
IdempotencyKey: ev.IdempotencyKey,
Headers: ev.Headers,
URLQueryParams: ev.URLQueryParams,
Latency: ev.Latency,
Endpoint: &datastore.Endpoint{
UID: ev.Endpoint.UID.ValueOrZero(),
ProjectID: ev.Endpoint.ProjectID.ValueOrZero(),
Expand Down Expand Up @@ -794,6 +797,7 @@ type EventDeliveryPaginated struct {
Headers httpheader.HTTPHeader `json:"headers" db:"headers"`
URLQueryParams string `json:"url_query_params" db:"url_query_params"`
IdempotencyKey string `json:"idempotency_key" db:"idempotency_key"`
Latency string `json:"latency" db:"latency"`

Endpoint *EndpointMetadata `json:"endpoint_metadata,omitempty" db:"endpoint_metadata"`
Event *EventMetadata `json:"event_metadata,omitempty" db:"event_metadata"`
Expand Down
7 changes: 7 additions & 0 deletions database/postgres/event_delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func Test_eventDeliveryRepo_CreateEventDelivery(t *testing.T) {
dbEventDelivery.CreatedAt, dbEventDelivery.UpdatedAt = time.Time{}, time.Time{}
dbEventDelivery.Event, dbEventDelivery.Endpoint, dbEventDelivery.Source, dbEventDelivery.Device = nil, nil, nil, nil

require.Equal(t, "", dbEventDelivery.Latency)

require.Equal(t, ed.Metadata.NextSendTime.UTC(), dbEventDelivery.Metadata.NextSendTime.UTC())
ed.Metadata.NextSendTime = time.Time{}
dbEventDelivery.Metadata.NextSendTime = time.Time{}
Expand Down Expand Up @@ -337,6 +339,10 @@ func Test_eventDeliveryRepo_UpdateEventDeliveryWithAttempt(t *testing.T) {
UID: ulid.Make().String(),
}

latency := "1h2m"

ed.Latency = latency

err = edRepo.UpdateEventDeliveryWithAttempt(context.Background(), project.UID, *ed, newAttempt)
require.NoError(t, err)

Expand All @@ -345,6 +351,7 @@ func Test_eventDeliveryRepo_UpdateEventDeliveryWithAttempt(t *testing.T) {

require.Equal(t, ed.DeliveryAttempts[0], dbEventDelivery.DeliveryAttempts[0])
require.Equal(t, newAttempt, dbEventDelivery.DeliveryAttempts[1])
require.Equal(t, latency, dbEventDelivery.Latency)
}

func Test_eventDeliveryRepo_CountEventDeliveries(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions datastore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ type EventDelivery struct {
Headers httpheader.HTTPHeader `json:"headers" db:"headers"`
URLQueryParams string `json:"url_query_params" db:"url_query_params"`
IdempotencyKey string `json:"idempotency_key" db:"idempotency_key"`
Latency string `json:"latency" db:"latency"`

Endpoint *Endpoint `json:"endpoint_metadata,omitempty" db:"endpoint_metadata"`
Event *Event `json:"event_metadata,omitempty" db:"event_metadata"`
Expand Down
6 changes: 6 additions & 0 deletions sql/1698683940.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- +migrate Up
ALTER TABLE convoy.event_deliveries ADD COLUMN IF NOT EXISTS latency TEXT;

-- +migrate Down
ALTER TABLE convoy.event_deliveries DROP COLUMN IF EXISTS latency;

4 changes: 3 additions & 1 deletion worker/task/process_event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type EventDelivery struct {
}

func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDeliveryRepo datastore.EventDeliveryRepository,
projectRepo datastore.ProjectRepository, subRepo datastore.SubscriptionRepository, notificationQueue queue.Queuer, rateLimiter limiter.RateLimiter) func(context.Context, *asynq.Task) error {
projectRepo datastore.ProjectRepository, subRepo datastore.SubscriptionRepository, notificationQueue queue.Queuer, rateLimiter limiter.RateLimiter,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var data EventDelivery

Expand Down Expand Up @@ -196,6 +197,7 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive

eventDelivery.Status = datastore.SuccessEventStatus
eventDelivery.Description = ""
eventDelivery.Latency = time.Since(eventDelivery.CreatedAt).String()
} else {
requestLogger.Errorf("%s", eventDelivery.UID)
done = false
Expand Down

0 comments on commit 49aac3d

Please sign in to comment.