Skip to content

Commit

Permalink
Merge pull request #1276 from openmeterio/feat/handle-entitlements-ev…
Browse files Browse the repository at this point in the history
…ents

feat: support entitlement, ingest events
  • Loading branch information
turip authored Aug 1, 2024
2 parents 51e6274 + 3a5bc60 commit a80830e
Show file tree
Hide file tree
Showing 4 changed files with 348 additions and 23 deletions.
4 changes: 2 additions & 2 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ func main() {
// Initialize worker
workerOptions := balanceworker.WorkerOptions{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
// TODO: IngestEventsTopic
Subscriber: wmSubscriber,
IngestEventsTopic: conf.Events.IngestEvents.Topic,
Subscriber: wmSubscriber,

TargetTopic: conf.Events.SystemEvents.Topic,
Publisher: publishers.watermillPublisher,
Expand Down
159 changes: 159 additions & 0 deletions internal/entitlement/balanceworker/entitlementhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package balanceworker

import (
"context"
"fmt"
"time"

"github.com/ThreeDotsLabs/watermill/message"

"github.com/openmeterio/openmeter/internal/entitlement"
"github.com/openmeterio/openmeter/internal/entitlement/httpdriver"
"github.com/openmeterio/openmeter/internal/entitlement/snapshot"
"github.com/openmeterio/openmeter/internal/event/models"
"github.com/openmeterio/openmeter/internal/event/spec"
"github.com/openmeterio/openmeter/internal/productcatalog"
"github.com/openmeterio/openmeter/pkg/convert"
)

func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent) ([]*message.Message, error) {
namespace := delEvent.Namespace.ID

feature, err := w.connectors.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if err != nil {
return nil, fmt.Errorf("failed to get feature: %w", err)
}

subjectID := ""
if w.opts.SubjectIDResolver != nil {
subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, namespace, delEvent.SubjectKey)
if err != nil {
return nil, fmt.Errorf("failed to get subject ID: %w", err)
}
}

calculationTime := time.Now()

event, err := spec.NewCloudEvent(
spec.EventSpec{
Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID),
Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey),
},
snapshot.SnapshotEvent{
Entitlement: delEvent.Entitlement,
Namespace: models.NamespaceID{
ID: namespace,
},
Subject: models.SubjectKeyAndID{
Key: delEvent.SubjectKey,
ID: subjectID,
},
Feature: *feature,
Operation: snapshot.BalanceOperationDelete,

CalculatedAt: convert.ToPointer(calculationTime),

CurrentUsagePeriod: delEvent.CurrentUsagePeriod,
},
)
if err != nil {
return nil, fmt.Errorf("failed to create cloud event: %w", err)
}

wmMessage, err := w.opts.Marshaler.MarshalEvent(event)
if err != nil {
return nil, fmt.Errorf("failed to marshal cloud event: %w", err)
}

_ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{
HighWatermark: calculationTime.Add(-defaultClockDrift),
IsDeleted: true,
})

return []*message.Message{wmMessage}, nil
}

func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) ([]*message.Message, error) {
calculatedAt := time.Now()

if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok {
if entry.HighWatermark.After(calculatedAt) || entry.IsDeleted {
return nil, nil
}
}

wmMessage, err := w.createSnapshotEvent(ctx, entitlementID, source, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err)
}

_ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{
HighWatermark: calculatedAt.Add(-defaultClockDrift),
})

return []*message.Message{wmMessage}, nil
}

func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (*message.Message, error) {
entitlement, err := w.connectors.Entitlement.GetEntitlement(ctx, entitlementID.Namespace, entitlementID.ID)
if err != nil {
return nil, fmt.Errorf("failed to get entitlement: %w", err)
}

feature, err := w.connectors.Feature.GetFeature(ctx, entitlementID.Namespace, entitlement.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if err != nil {
return nil, fmt.Errorf("failed to get feature: %w", err)
}

value, err := w.connectors.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlement.SubjectKey, entitlement.ID, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to get entitlement value: %w", err)
}

mappedValues, err := httpdriver.MapEntitlementValueToAPI(value)
if err != nil {
return nil, fmt.Errorf("failed to map entitlement value: %w", err)
}

subjectID := ""
if w.opts.SubjectIDResolver != nil {
subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, entitlementID.Namespace, entitlementID.ID)
if err != nil {
return nil, fmt.Errorf("failed to get subject ID: %w", err)
}
}

event, err := spec.NewCloudEvent(
spec.EventSpec{
Source: source,
Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey),
},
snapshot.SnapshotEvent{
Entitlement: *entitlement,
Namespace: models.NamespaceID{
ID: entitlementID.Namespace,
},
Subject: models.SubjectKeyAndID{
Key: entitlement.SubjectKey,
ID: subjectID,
},
Feature: *feature,
Operation: snapshot.BalanceOperationUpdate,

CalculatedAt: &calculatedAt,

Balance: convert.ToPointer((snapshot.EntitlementValue)(mappedValues)),
CurrentUsagePeriod: entitlement.CurrentUsagePeriod,
},
)
if err != nil {
return nil, fmt.Errorf("failed to create cloud event: %w", err)
}

wmMessage, err := w.opts.Marshaler.MarshalEvent(event)
if err != nil {
return nil, fmt.Errorf("failed to marshal cloud event: %w", err)
}

return wmMessage, nil
}
69 changes: 69 additions & 0 deletions internal/entitlement/balanceworker/ingesthandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package balanceworker

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"

"github.com/openmeterio/openmeter/internal/entitlement"
"github.com/openmeterio/openmeter/internal/event/spec"
"github.com/openmeterio/openmeter/internal/productcatalog"
"github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification"
)

func (w *Worker) handleIngestEvent(ctx context.Context, event ingestnotification.EventIngested) ([]*message.Message, error) {
affectedEntitlements, err := w.GetEntitlementsAffectedByMeterSubject(ctx, event.Namespace.ID, event.MeterSlugs, event.SubjectKey)
if err != nil {
return nil, err
}

result := make([]*message.Message, 0, len(affectedEntitlements))
for _, entitlement := range affectedEntitlements {
messages, err := w.handleEntitlementUpdateEvent(
ctx,
entitlement,
spec.ComposeResourcePath(entitlement.Namespace, spec.EntityEvent),
)
if err != nil {
return nil, err
}

result = append(result, messages...)
}

return result, nil
}

func (w *Worker) GetEntitlementsAffectedByMeterSubject(ctx context.Context, namespace string, meterSlugs []string, subject string) ([]NamespacedID, error) {
featuresByMeter, err := w.connectors.Feature.ListFeatures(ctx, productcatalog.ListFeaturesParams{
Namespace: namespace,
MeterSlugs: meterSlugs,
})
if err != nil {
return nil, err
}

featureIDs := make([]string, 0, len(featuresByMeter.Items))
for _, feature := range featuresByMeter.Items {
featureIDs = append(featureIDs, feature.ID)
}

entitlements, err := w.connectors.Entitlement.ListEntitlements(ctx, entitlement.ListEntitlementsParams{
Namespaces: []string{namespace},
SubjectKeys: []string{subject},
FeatureIDs: featureIDs,
})
if err != nil {
return nil, err
}

entitlementIDs := make([]NamespacedID, 0, len(entitlements.Items))
for _, entitlement := range entitlements.Items {
entitlementIDs = append(entitlementIDs, NamespacedID{
ID: entitlement.ID,
Namespace: entitlement.Namespace,
})
}

return entitlementIDs, nil
}
Loading

0 comments on commit a80830e

Please sign in to comment.