Skip to content

Commit

Permalink
Improve event deliveries filtering (#1824)
Browse files Browse the repository at this point in the history
- add sort url query for LoadEventDeliveriesPaged & LoadEventsPaged
- add event type url query for outgoing projects

* feat: test event type filter

* fix: fix LoadEventDeliveriesPaged call

* fix: fix build issues

* fix: fix LoadEventDeliveriesPaged setup

* feat: update event filters (#1840)

* update date picker component

* update dropdown component

* update endpoint filter component

* add sort events toggle

* update event filters

* update event delivery filter component

* update: update filter component

* update filter button

* fix sort toggle

* remove comments, fix sort toggle

* update pagaination

* update event filters

* update events filter

* update events filter buttons

* fix filter toggle

* fix: fix asc & desc sorting
- remove print stmts
- change case "DESC" to default in SetCursors
- remove CTE from baseEventsPagedBackward
* fix: fix getFwdEventPageQuery
- fix issues where any prev page request jumps to the first page
- fix ASC issue where there's always a previous page even on the first page

---------

Co-authored-by: oluwadaminiola <pelumioni25@gmail.com>
  • Loading branch information
danvixent and Oluwadaminiola authored Nov 17, 2023
1 parent 1cac88b commit 4e6b718
Show file tree
Hide file tree
Showing 48 changed files with 2,072 additions and 732 deletions.
3 changes: 2 additions & 1 deletion api/dashboard/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (a *DashboardHandler) GetEventDeliveriesPaged(w http.ResponseWriter, r *htt
}

f := data.Filter
ed, paginationData, err := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache).LoadEventDeliveriesPaged(r.Context(), project.UID, f.EndpointIDs, f.EventID, f.SubscriptionID, f.Status, f.SearchParams, f.Pageable, f.IdempotencyKey)

ed, paginationData, err := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache).LoadEventDeliveriesPaged(r.Context(), project.UID, f.EndpointIDs, f.EventID, f.SubscriptionID, f.Status, f.SearchParams, f.Pageable, f.IdempotencyKey, f.EventType)
if err != nil {
log.FromContext(r.Context()).WithError(err).Error("failed to fetch event deliveries")
_ = render.Render(w, r, util.NewErrorResponse("an error occurred while fetching event deliveries", http.StatusInternalServerError))
Expand Down
1 change: 1 addition & 0 deletions api/models/event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (ql *QueryListEventDelivery) Transform(r *http.Request) (*QueryListEventDel
SubscriptionID: r.URL.Query().Get("subscriptionId"),
IdempotencyKey: r.URL.Query().Get("idempotencyKey"),
EventID: r.URL.Query().Get("eventId"),
EventType: r.URL.Query().Get("eventType"),
Status: getEventDeliveryStatus(r),
Pageable: m.GetPageableFromContext(r.Context()),
SearchParams: searchParams,
Expand Down
3 changes: 2 additions & 1 deletion api/portal-api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ func (a *PortalLinkHandler) GetEventDeliveriesPaged(w http.ResponseWriter, r *ht

data.Filter.EndpointIDs = endpointIDs
f := data.Filter
ed, paginationData, err := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache).LoadEventDeliveriesPaged(r.Context(), project.UID, f.EndpointIDs, f.EventID, f.SubscriptionID, f.Status, f.SearchParams, f.Pageable, f.IdempotencyKey)

ed, paginationData, err := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache).LoadEventDeliveriesPaged(r.Context(), project.UID, f.EndpointIDs, f.EventID, f.SubscriptionID, f.Status, f.SearchParams, f.Pageable, f.IdempotencyKey, f.EventType)
if err != nil {
log.FromContext(r.Context()).WithError(err).Error("failed to fetch event deliveries")
_ = render.Render(w, r, util.NewErrorResponse("an error occurred while fetching event deliveries", http.StatusInternalServerError))
Expand Down
6 changes: 4 additions & 2 deletions api/public/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package public
import (
"encoding/json"
"fmt"
"net/http"

"github.com/frain-dev/convoy"
"github.com/frain-dev/convoy/pkg/msgpack"
"github.com/frain-dev/convoy/queue"
"github.com/frain-dev/convoy/worker/task"
"github.com/oklog/ulid/v2"
"net/http"

"github.com/frain-dev/convoy/pkg/log"

Expand Down Expand Up @@ -528,7 +529,8 @@ func (a *PublicHandler) GetEventDeliveriesPaged(w http.ResponseWriter, r *http.R
}

f := data.Filter
ed, paginationData, err := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache).LoadEventDeliveriesPaged(r.Context(), project.UID, f.EndpointIDs, f.EventID, f.SubscriptionID, f.Status, f.SearchParams, f.Pageable, f.IdempotencyKey)

ed, paginationData, err := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache).LoadEventDeliveriesPaged(r.Context(), project.UID, f.EndpointIDs, f.EventID, f.SubscriptionID, f.Status, f.SearchParams, f.Pageable, f.IdempotencyKey, f.EventType)
if err != nil {
log.FromContext(r.Context()).WithError(err).Error("failed to fetch event deliveries")
_ = render.Render(w, r, util.NewErrorResponse("an error occurred while fetching event deliveries", http.StatusInternalServerError))
Expand Down
67 changes: 53 additions & 14 deletions database/postgres/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/frain-dev/convoy/cache"
"github.com/frain-dev/convoy/config"
"time"

"github.com/frain-dev/convoy/database"
"github.com/frain-dev/convoy/datastore"
Expand Down Expand Up @@ -112,21 +114,26 @@ const (
LEFT JOIN convoy.sources s ON s.id = ev.source_id
WHERE ev.deleted_at IS NULL`

baseEventsPagedForward = `%s %s AND ev.id <= :cursor
GROUP BY ev.id, s.id
ORDER BY ev.id DESC
LIMIT :limit
baseEventsPagedForward = `
WITH events AS (
%s %s AND ev.id <= :cursor
GROUP BY ev.id, s.id
ORDER BY ev.id %s
LIMIT :limit
)
SELECT * FROM events ORDER BY id %s
`

baseEventsPagedBackward = `
WITH events AS (
%s %s AND ev.id >= :cursor
%s %s AND ev.id >= :cursor
GROUP BY ev.id, s.id
ORDER BY ev.id ASC
ORDER BY ev.id %s
LIMIT :limit
)
SELECT * FROM events ORDER BY id DESC
SELECT * FROM events ORDER BY id %s
`

baseEventFilter = ` AND ev.project_id = :project_id
Expand All @@ -153,7 +160,7 @@ const (
LEFT JOIN convoy.events_endpoints ee ON ev.id = ee.event_id
WHERE ev.deleted_at IS NULL
`
countPrevEvents = ` AND ev.id > :cursor GROUP BY ev.id ORDER BY ev.id DESC LIMIT 1`
countPrevEvents = ` AND ev.id > :cursor GROUP BY ev.id ORDER BY ev.id %s LIMIT 1`

softDeleteProjectEvents = `
UPDATE convoy.events SET deleted_at = NOW()
Expand Down Expand Up @@ -364,12 +371,12 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte
"event_id": filter.Query,
}

var base = baseEventsPaged
base := baseEventsPaged
var baseQueryPagination string
if filter.Pageable.Direction == datastore.Next {
baseQueryPagination = baseEventsPagedForward
baseQueryPagination = getFwdEventPageQuery(filter.Pageable.SortOrder())
} else {
baseQueryPagination = baseEventsPagedBackward
baseQueryPagination = getBackwardEventPageQuery(filter.Pageable.SortOrder())
}

filterQuery = baseEventFilter
Expand All @@ -382,7 +389,12 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte
base = baseEventsSearch
}

query = fmt.Sprintf(baseQueryPagination, base, filterQuery)
preOrder := filter.Pageable.SortOrder()
if filter.Pageable.Direction == datastore.Prev {
preOrder = reverseOrder(preOrder)
}

query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, filter.Pageable.SortOrder())
query, args, err = sqlx.Named(query, arg)
if err != nil {
return nil, datastore.PaginationData{}, err
Expand Down Expand Up @@ -423,7 +435,10 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte
baseCountEvents = baseCountPrevEventSearch
}

cq := baseCountEvents + filterQuery + countPrevEvents
tmp := getCountDeliveriesPrevRowQuery(filter.Pageable.SortOrder())
tmp = fmt.Sprintf(tmp, filter.Pageable.SortOrder())

cq := baseCountEvents + filterQuery + tmp
countQuery, qargs, err = sqlx.Named(cq, qarg)
if err != nil {
return nil, datastore.PaginationData{}, err
Expand Down Expand Up @@ -524,3 +539,27 @@ type EventEndpoint struct {
EventID string `db:"event_id"`
EndpointID string `db:"endpoint_id"`
}

func getFwdEventPageQuery(sortOrder string) string {
if sortOrder == "ASC" {
return strings.Replace(baseEventsPagedForward, "<=", ">=", 1)
}

return baseEventsPagedForward
}

func getBackwardEventPageQuery(sortOrder string) string {
if sortOrder == "ASC" {
return strings.Replace(baseEventsPagedBackward, ">=", "<=", 1)
}

return baseEventsPagedBackward
}

func getCountDeliveriesPrevRowQuery(sortOrder string) string {
if sortOrder == "ASC" {
return strings.Replace(countPrevEvents, ">", "<", 1)
}

return countPrevEvents
}
78 changes: 64 additions & 14 deletions database/postgres/event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/frain-dev/convoy/cache"
Expand Down Expand Up @@ -74,12 +75,18 @@ const (
`

baseEventDeliveryPagedForward = `
%s
%s
AND ed.id <= :cursor
GROUP BY ed.id, ep.id, ev.id, d.id, s.id
ORDER BY ed.id DESC
LIMIT :limit
WITH event_deliveries AS (
%s
%s
AND ed.id <= :cursor
GROUP BY ed.id, ep.id, ev.id, d.id, s.id
ORDER BY ed.id %s
LIMIT :limit
)
SELECT * FROM event_deliveries
WHERE ("event_metadata.event_type" = :event_type OR :event_type = '')
ORDER BY id %s
`

baseEventDeliveryPagedBackward = `
Expand All @@ -88,11 +95,13 @@ const (
%s
AND ed.id >= :cursor
GROUP BY ed.id, ep.id, ev.id, d.id, s.id
ORDER BY ed.id ASC
ORDER BY ed.id %s
LIMIT :limit
)
SELECT * FROM event_deliveries ORDER BY id DESC
SELECT * FROM event_deliveries
WHERE ("event_metadata.event_type" = :event_type OR :event_type = '')
ORDER BY id %s
`

fetchEventDeliveryByID = baseFetchEventDelivery + ` AND ed.id = $1 AND ed.project_id = $2`
Expand All @@ -108,7 +117,7 @@ const (
FROM convoy.event_deliveries ed
WHERE ed.deleted_at IS NULL
%s
AND ed.id > :cursor GROUP BY ed.id ORDER BY ed.id DESC LIMIT 1`
AND ed.id > :cursor GROUP BY ed.id ORDER BY ed.id %s LIMIT 1`

loadEventDeliveriesIntervals = `
SELECT
Expand Down Expand Up @@ -466,7 +475,7 @@ func (e *eventDeliveryRepo) DeleteProjectEventDeliveries(ctx context.Context, pr
return nil
}

func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projectID string, endpointIDs []string, eventID, subscriptionID string, status []datastore.EventDeliveryStatus, params datastore.SearchParams, pageable datastore.Pageable, idempotencyKey string) ([]datastore.EventDelivery, datastore.PaginationData, error) {
func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projectID string, endpointIDs []string, eventID, subscriptionID string, status []datastore.EventDeliveryStatus, params datastore.SearchParams, pageable datastore.Pageable, idempotencyKey, eventType string) ([]datastore.EventDelivery, datastore.PaginationData, error) {
eventDeliveriesP := make([]EventDeliveryPaginated, 0)

start := time.Unix(params.CreatedAtStart, 0)
Expand All @@ -479,6 +488,7 @@ func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projec
"subscription_id": subscriptionID,
"start_date": start,
"event_id": eventID,
"event_type": eventType,
"end_date": end,
"status": status,
"cursor": pageable.Cursor(),
Expand All @@ -487,9 +497,9 @@ func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projec

var query, filterQuery string
if pageable.Direction == datastore.Next {
query = baseEventDeliveryPagedForward
query = getFwdDeliveryPageQuery(pageable.SortOrder())
} else {
query = baseEventDeliveryPagedBackward
query = getBackwardDeliveryPageQuery(pageable.SortOrder())
}

filterQuery = baseEventDeliveryFilter
Expand All @@ -505,7 +515,12 @@ func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projec
filterQuery += ` AND ed.subscription_id = :subscription_id`
}

query = fmt.Sprintf(query, baseFetchEventDelivery, filterQuery)
preOrder := pageable.SortOrder()
if pageable.Direction == datastore.Prev {
preOrder = reverseOrder(preOrder)
}

query = fmt.Sprintf(query, baseFetchEventDelivery, filterQuery, preOrder, pageable.SortOrder())

query, args, err := sqlx.Named(query, arg)
if err != nil {
Expand Down Expand Up @@ -594,7 +609,9 @@ func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projec
qarg := arg
qarg["cursor"] = first.UID

cq := fmt.Sprintf(countPrevEventDeliveries, filterQuery)
tmp := getCountEventPrevRowQuery(pageable.SortOrder())

cq := fmt.Sprintf(tmp, filterQuery, pageable.SortOrder())
countQuery, qargs, err = sqlx.Named(cq, qarg)
if err != nil {
return nil, datastore.PaginationData{}, err
Expand Down Expand Up @@ -835,3 +852,36 @@ func (m *CLIMetadata) Scan(value interface{}) error {

return nil
}

func getFwdDeliveryPageQuery(sortOrder string) string {
if sortOrder == "ASC" {
return strings.Replace(baseEventDeliveryPagedForward, "<=", ">=", 1)
}

return baseEventDeliveryPagedForward
}

func getBackwardDeliveryPageQuery(sortOrder string) string {
if sortOrder == "ASC" {
return strings.Replace(baseEventDeliveryPagedBackward, ">=", "<=", 1)
}

return baseEventDeliveryPagedBackward
}

func getCountEventPrevRowQuery(sortOrder string) string {
if sortOrder == "ASC" {
return strings.Replace(countPrevEventDeliveries, ">", "<", 1)
}

return countPrevEventDeliveries
}

func reverseOrder(sortOrder string) string {
switch sortOrder {
case "ASC":
return "DESC"
default:
return "ASC"
}
}
27 changes: 26 additions & 1 deletion database/postgres/event_delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func Test_eventDeliveryRepo_LoadEventDeliveriesPaged(t *testing.T) {
datastore.Pageable{
PerPage: 10,
},
"",
"", "",
)

require.NoError(t, err)
Expand Down Expand Up @@ -491,4 +491,29 @@ func Test_eventDeliveryRepo_LoadEventDeliveriesPaged(t *testing.T) {

require.Equal(t, ed, dbEventDelivery)
}

evType := "file"
event = seedEventWithEventType(t, db, project, evType)

ed := generateEventDelivery(project, endpoint, event, device, sub)

err = edRepo.CreateEventDelivery(context.Background(), ed)
require.NoError(t, err)

filteredDeliveries, _, err := edRepo.LoadEventDeliveriesPaged(
context.Background(), project.UID, []string{endpoint.UID}, event.UID, sub.UID,
[]datastore.EventDeliveryStatus{datastore.SuccessEventStatus},
datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(time.Hour).Unix(),
},
datastore.Pageable{
PerPage: 10,
},
"", evType,
)

require.NoError(t, err)
require.Equal(t, 1, len(filteredDeliveries))
require.Equal(t, ed.UID, filteredDeliveries[0].UID)
}
9 changes: 9 additions & 0 deletions database/postgres/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,12 @@ func seedEvent(t *testing.T, db database.Database, project *datastore.Project) *
require.NoError(t, NewEventRepo(db, nil).CreateEvent(context.Background(), ev))
return ev
}

func seedEventWithEventType(t *testing.T, db database.Database, project *datastore.Project, eventType string) *datastore.Event {
ev := generateEvent(t, db)
ev.EventType = datastore.EventType(eventType)
ev.ProjectID = project.UID

require.NoError(t, NewEventRepo(db, nil).CreateEvent(context.Background(), ev))
return ev
}
1 change: 1 addition & 0 deletions datastore/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Filter struct {
EndpointIDs []string
SubscriptionID string
EventID string
EventType string
SourceID string
Pageable Pageable
IdempotencyKey string
Expand Down
Loading

0 comments on commit 4e6b718

Please sign in to comment.