Skip to content

Commit

Permalink
fix: adjust for new event names
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Aug 1, 2024
1 parent 00a71b3 commit 3a5bc60
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 20 deletions.
4 changes: 2 additions & 2 deletions internal/entitlement/balanceworker/entitlementhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti
Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID),
Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey),
},
snapshot.EntitlementBalanceSnapshotEvent{
snapshot.SnapshotEvent{
Entitlement: delEvent.Entitlement,
Namespace: models.NamespaceID{
ID: namespace,
Expand Down Expand Up @@ -128,7 +128,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
Source: source,
Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey),
},
snapshot.EntitlementBalanceSnapshotEvent{
snapshot.SnapshotEvent{
Entitlement: *entitlement,
Namespace: models.NamespaceID{
ID: entitlementID.Namespace,
Expand Down
2 changes: 1 addition & 1 deletion internal/entitlement/balanceworker/ingesthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification"
)

func (w *Worker) handleIngestEvent(ctx context.Context, event ingestnotification.IngestEvent) ([]*message.Message, error) {
func (w *Worker) handleIngestEvent(ctx context.Context, event ingestnotification.EventIngested) ([]*message.Message, error) {
affectedEntitlements, err := w.GetEntitlementsAffectedByMeterSubject(ctx, event.Namespace.ID, event.MeterSlugs, event.SubjectKey)
if err != nil {
return nil, err
Expand Down
21 changes: 4 additions & 17 deletions internal/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,8 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) {
)

// Metered entitlement events
case meteredentitlement.ResetEntitlementEvent{}.Spec().Type():
event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload)
if err != nil {
return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err)
}

return w.handleEntitlementUpdateEvent(
msg.Context(),
NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID},
spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID),
)

// Metered entitlement events
case meteredentitlement.ResetEntitlementEvent{}.Spec().Type():
event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload)
case meteredentitlement.EntitlementResetEvent{}.Spec().Type():
event, err := spec.ParseCloudEventFromBytes[meteredentitlement.EntitlementResetEvent](msg.Payload)
if err != nil {
return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err)
}
Expand All @@ -245,8 +232,8 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) {
)

// Ingest event
case ingestnotification.IngestEvent{}.Spec().Type():
event, err := spec.ParseCloudEventFromBytes[ingestnotification.IngestEvent](msg.Payload)
case ingestnotification.EventIngested{}.Spec().Type():
event, err := spec.ParseCloudEventFromBytes[ingestnotification.EventIngested](msg.Payload)
if err != nil {
return nil, fmt.Errorf("failed to parse ingest event: %w", err)
}
Expand Down

0 comments on commit 3a5bc60

Please sign in to comment.