From 13ae5e04ea66778418bb7793485a88ac89d8322b Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 30 Jul 2024 12:59:35 +0200 Subject: [PATCH 1/7] feat: initialize entitlements in balance worker Given that this would copy paste most of the code from server's main this patch moves some client initializaiton code into internal for better reusability. --- cmd/server/main.go | 5 +- internal/cmd/README.md | 4 + internal/cmd/clickhouse/client.go | 40 ++++++++ internal/cmd/postgres/client.go | 39 ++++++++ internal/cmd/streaming/connector.go | 27 ++++++ internal/common/entitlement/connectors.go | 96 ++++++++++++++++++++ internal/entitlement/balanceworker/worker.go | 3 +- 7 files changed, 209 insertions(+), 5 deletions(-) create mode 100644 internal/cmd/README.md create mode 100644 internal/cmd/clickhouse/client.go create mode 100644 internal/cmd/postgres/client.go create mode 100644 internal/cmd/streaming/connector.go create mode 100644 internal/common/entitlement/connectors.go diff --git a/cmd/server/main.go b/cmd/server/main.go index b955974b7..51387543a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,7 +12,7 @@ import ( "syscall" "time" - entDialectSQL "entgo.io/ent/dialect/sql" + "entgo.io/ent/dialect/sql" health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" @@ -298,7 +298,6 @@ func main() { debugConnector := debug.NewDebugConnector(streamingConnector) entitlementConnRegistry := ®istry.Entitlement{} - // Initialize Postgres if conf.Entitlements.Enabled { pgClients, err := initPGClients(conf.Postgres) if err != nil { @@ -537,7 +536,7 @@ func initNamespace(config config.Configuration, namespaces ...namespace.Handler) } type pgClients struct { - driver *entDialectSQL.Driver + driver *sql.Driver client *db.Client } diff --git a/internal/cmd/README.md b/internal/cmd/README.md new file mode 100644 index 000000000..a18d22135 --- /dev/null +++ b/internal/cmd/README.md @@ -0,0 +1,4 @@ +# Command initialization helpers + +These packages can be used to quickly create clients from configuration objects, so +that we can prevent code duplication in the mains. diff --git a/internal/cmd/clickhouse/client.go b/internal/cmd/clickhouse/client.go new file mode 100644 index 000000000..b4f3612dd --- /dev/null +++ b/internal/cmd/clickhouse/client.go @@ -0,0 +1,40 @@ +package streaming + +import ( + "crypto/tls" + "fmt" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/openmeterio/openmeter/config" +) + +func GetClient(config config.Configuration) (clickhouse.Conn, error) { + options := &clickhouse.Options{ + Addr: []string{config.Aggregation.ClickHouse.Address}, + Auth: clickhouse.Auth{ + Database: config.Aggregation.ClickHouse.Database, + Username: config.Aggregation.ClickHouse.Username, + Password: config.Aggregation.ClickHouse.Password, + }, + DialTimeout: time.Duration(10) * time.Second, + MaxOpenConns: 5, + MaxIdleConns: 5, + ConnMaxLifetime: time.Duration(10) * time.Minute, + ConnOpenStrategy: clickhouse.ConnOpenInOrder, + BlockBufferSize: 10, + } + // This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server. + // See: https://clickhouse.com/docs/en/integrations/go#using-tls + if config.Aggregation.ClickHouse.TLS { + options.TLS = &tls.Config{} + } + + // Initialize ClickHouse + clickHouseClient, err := clickhouse.Open(options) + if err != nil { + return nil, fmt.Errorf("init clickhouse client: %w", err) + } + + return clickHouseClient, nil +} diff --git a/internal/cmd/postgres/client.go b/internal/cmd/postgres/client.go new file mode 100644 index 000000000..4c72e547a --- /dev/null +++ b/internal/cmd/postgres/client.go @@ -0,0 +1,39 @@ +package postgres + +import ( + "context" + "fmt" + + "entgo.io/ent/dialect/sql" + "github.com/openmeterio/openmeter/config" + "github.com/openmeterio/openmeter/internal/ent/db" + "github.com/openmeterio/openmeter/pkg/framework/entutils" +) + +type Clients struct { + Driver *sql.Driver + Client *db.Client +} + +func GetClients(config config.PostgresConfig) (*Clients, error) { + if err := config.Validate(); err != nil { + return nil, fmt.Errorf("invalid postgres config: %w", err) + } + driver, err := entutils.GetPGDriver(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to init postgres driver: %w", err) + } + + // initialize client & run migrations + dbClient := db.NewClient(db.Driver(driver)) + + // TODO: use versioned migrations: https://entgo.io/docs/versioned-migrations + if err := dbClient.Schema.Create(context.Background()); err != nil { + return nil, fmt.Errorf("failed to migrate credit db: %w", err) + } + + return &Clients{ + Driver: driver, + Client: dbClient, + }, nil +} diff --git a/internal/cmd/streaming/connector.go b/internal/cmd/streaming/connector.go new file mode 100644 index 000000000..bc274042d --- /dev/null +++ b/internal/cmd/streaming/connector.go @@ -0,0 +1,27 @@ +package streaming + +import ( + "fmt" + "log/slog" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/openmeterio/openmeter/config" + "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" +) + +func GetStreaming(config config.Configuration, clickHouseClient clickhouse.Conn, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) { + streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ + Logger: logger, + ClickHouse: clickHouseClient, + Database: config.Aggregation.ClickHouse.Database, + Meters: meterRepository, + CreateOrReplaceMeter: config.Aggregation.CreateOrReplaceMeter, + PopulateMeter: config.Aggregation.PopulateMeter, + }) + if err != nil { + return nil, fmt.Errorf("init clickhouse streaming: %w", err) + } + + return streamingConnector, nil +} diff --git a/internal/common/entitlement/connectors.go b/internal/common/entitlement/connectors.go new file mode 100644 index 000000000..19c9a65db --- /dev/null +++ b/internal/common/entitlement/connectors.go @@ -0,0 +1,96 @@ +package entitlement + +import ( + "log/slog" + "time" + + "github.com/openmeterio/openmeter/internal/ent/db" + "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/openmeter/credit" + creditpgadapter "github.com/openmeterio/openmeter/openmeter/credit/postgresadapter" + "github.com/openmeterio/openmeter/openmeter/entitlement" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + entitlementpgadapter "github.com/openmeterio/openmeter/openmeter/entitlement/postgresadapter" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" + "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/openmeter/productcatalog" + productcatalogpgadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/postgresadapter" + "github.com/openmeterio/openmeter/openmeter/streaming" +) + +type Connectors struct { + Feature productcatalog.FeatureConnector + EntitlementOwner credit.OwnerConnector + CreditBalance credit.BalanceConnector + Grant credit.GrantConnector + MeteredEntitlement meteredentitlement.Connector + Entitlement entitlement.EntitlementConnector +} + +type ConnectorsOptions struct { + DatabaseClient *db.Client + StreamingConnector streaming.Connector + Logger *slog.Logger + MeterRepository meter.Repository + Publisher publisher.TopicPublisher +} + +func GetConnectors(opts ConnectorsOptions) *Connectors { + // Initialize database adapters + featureDBAdapter := productcatalogpgadapter.NewPostgresFeatureDBAdapter(opts.DatabaseClient, opts.Logger) + entitlementDBAdapter := entitlementpgadapter.NewPostgresEntitlementDBAdapter(opts.DatabaseClient) + usageResetDBAdapter := entitlementpgadapter.NewPostgresUsageResetDBAdapter(opts.DatabaseClient) + grantDBAdapter := creditpgadapter.NewPostgresGrantDBAdapter(opts.DatabaseClient) + balanceSnashotDBAdapter := creditpgadapter.NewPostgresBalanceSnapshotDBAdapter(opts.DatabaseClient) + + // Initialize connectors + featureConnector := productcatalog.NewFeatureConnector(featureDBAdapter, opts.MeterRepository) + entitlementOwnerConnector := meteredentitlement.NewEntitlementGrantOwnerAdapter( + featureDBAdapter, + entitlementDBAdapter, + usageResetDBAdapter, + opts.MeterRepository, + opts.Logger, + ) + creditBalanceConnector := credit.NewBalanceConnector( + grantDBAdapter, + balanceSnashotDBAdapter, + entitlementOwnerConnector, + opts.StreamingConnector, + opts.Logger, + ) + grantConnector := credit.NewGrantConnector( + entitlementOwnerConnector, + grantDBAdapter, + balanceSnashotDBAdapter, + time.Minute, + opts.Publisher, + ) + meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( + opts.StreamingConnector, + entitlementOwnerConnector, + creditBalanceConnector, + grantConnector, + entitlementDBAdapter, + opts.Publisher, + ) + entitlementConnector := entitlement.NewEntitlementConnector( + entitlementDBAdapter, + featureConnector, + opts.MeterRepository, + meteredEntitlementConnector, + staticentitlement.NewStaticEntitlementConnector(), + booleanentitlement.NewBooleanEntitlementConnector(), + opts.Publisher, + ) + + return &Connectors{ + Feature: featureConnector, + EntitlementOwner: entitlementOwnerConnector, + CreditBalance: creditBalanceConnector, + Grant: grantConnector, + MeteredEntitlement: meteredEntitlementConnector, + Entitlement: entitlementConnector, + } +} diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index b300a4c55..abd93f318 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -25,8 +25,7 @@ type WorkerOptions struct { TargetTopic string PoisonQueue *WorkerPoisonQueueOptions Publisher message.Publisher - - Marshaler publisher.CloudEventMarshaler + Marshaler publisher.CloudEventMarshaler Entitlement *registry.Entitlement From 1427549c597e91cec5b82694e5e1f567db4bbf6a Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 30 Jul 2024 13:17:30 +0200 Subject: [PATCH 2/7] fix: lint errors --- internal/cmd/clickhouse/client.go | 1 + internal/cmd/postgres/client.go | 1 + internal/cmd/streaming/connector.go | 1 + 3 files changed, 3 insertions(+) diff --git a/internal/cmd/clickhouse/client.go b/internal/cmd/clickhouse/client.go index b4f3612dd..bb1b95793 100644 --- a/internal/cmd/clickhouse/client.go +++ b/internal/cmd/clickhouse/client.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/openmeterio/openmeter/config" ) diff --git a/internal/cmd/postgres/client.go b/internal/cmd/postgres/client.go index 4c72e547a..bd336a9f7 100644 --- a/internal/cmd/postgres/client.go +++ b/internal/cmd/postgres/client.go @@ -5,6 +5,7 @@ import ( "fmt" "entgo.io/ent/dialect/sql" + "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/ent/db" "github.com/openmeterio/openmeter/pkg/framework/entutils" diff --git a/internal/cmd/streaming/connector.go b/internal/cmd/streaming/connector.go index bc274042d..bb6247510 100644 --- a/internal/cmd/streaming/connector.go +++ b/internal/cmd/streaming/connector.go @@ -5,6 +5,7 @@ import ( "log/slog" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" From 18f66f2d41eacc7bb349e7c7bf2a9343a09c739e Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 30 Jul 2024 12:59:35 +0200 Subject: [PATCH 3/7] feat: add support for entitlements events We add a basic highwatermark cache to prevent recalculations multiple times. This will be more important when the ingest part is ready. --- cmd/server/main.go | 5 +- internal/cmd/README.md | 4 - internal/cmd/clickhouse/client.go | 41 --- internal/cmd/postgres/client.go | 40 --- internal/cmd/streaming/connector.go | 28 -- internal/common/entitlement/connectors.go | 96 ------- internal/entitlement/balanceworker/worker.go | 260 +++++++++++++++++-- 7 files changed, 246 insertions(+), 228 deletions(-) delete mode 100644 internal/cmd/README.md delete mode 100644 internal/cmd/clickhouse/client.go delete mode 100644 internal/cmd/postgres/client.go delete mode 100644 internal/cmd/streaming/connector.go delete mode 100644 internal/common/entitlement/connectors.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 51387543a..b955974b7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,7 +12,7 @@ import ( "syscall" "time" - "entgo.io/ent/dialect/sql" + entDialectSQL "entgo.io/ent/dialect/sql" health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" @@ -298,6 +298,7 @@ func main() { debugConnector := debug.NewDebugConnector(streamingConnector) entitlementConnRegistry := ®istry.Entitlement{} + // Initialize Postgres if conf.Entitlements.Enabled { pgClients, err := initPGClients(conf.Postgres) if err != nil { @@ -536,7 +537,7 @@ func initNamespace(config config.Configuration, namespaces ...namespace.Handler) } type pgClients struct { - driver *sql.Driver + driver *entDialectSQL.Driver client *db.Client } diff --git a/internal/cmd/README.md b/internal/cmd/README.md deleted file mode 100644 index a18d22135..000000000 --- a/internal/cmd/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# Command initialization helpers - -These packages can be used to quickly create clients from configuration objects, so -that we can prevent code duplication in the mains. diff --git a/internal/cmd/clickhouse/client.go b/internal/cmd/clickhouse/client.go deleted file mode 100644 index bb1b95793..000000000 --- a/internal/cmd/clickhouse/client.go +++ /dev/null @@ -1,41 +0,0 @@ -package streaming - -import ( - "crypto/tls" - "fmt" - "time" - - "github.com/ClickHouse/clickhouse-go/v2" - - "github.com/openmeterio/openmeter/config" -) - -func GetClient(config config.Configuration) (clickhouse.Conn, error) { - options := &clickhouse.Options{ - Addr: []string{config.Aggregation.ClickHouse.Address}, - Auth: clickhouse.Auth{ - Database: config.Aggregation.ClickHouse.Database, - Username: config.Aggregation.ClickHouse.Username, - Password: config.Aggregation.ClickHouse.Password, - }, - DialTimeout: time.Duration(10) * time.Second, - MaxOpenConns: 5, - MaxIdleConns: 5, - ConnMaxLifetime: time.Duration(10) * time.Minute, - ConnOpenStrategy: clickhouse.ConnOpenInOrder, - BlockBufferSize: 10, - } - // This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server. - // See: https://clickhouse.com/docs/en/integrations/go#using-tls - if config.Aggregation.ClickHouse.TLS { - options.TLS = &tls.Config{} - } - - // Initialize ClickHouse - clickHouseClient, err := clickhouse.Open(options) - if err != nil { - return nil, fmt.Errorf("init clickhouse client: %w", err) - } - - return clickHouseClient, nil -} diff --git a/internal/cmd/postgres/client.go b/internal/cmd/postgres/client.go deleted file mode 100644 index bd336a9f7..000000000 --- a/internal/cmd/postgres/client.go +++ /dev/null @@ -1,40 +0,0 @@ -package postgres - -import ( - "context" - "fmt" - - "entgo.io/ent/dialect/sql" - - "github.com/openmeterio/openmeter/config" - "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/pkg/framework/entutils" -) - -type Clients struct { - Driver *sql.Driver - Client *db.Client -} - -func GetClients(config config.PostgresConfig) (*Clients, error) { - if err := config.Validate(); err != nil { - return nil, fmt.Errorf("invalid postgres config: %w", err) - } - driver, err := entutils.GetPGDriver(config.URL) - if err != nil { - return nil, fmt.Errorf("failed to init postgres driver: %w", err) - } - - // initialize client & run migrations - dbClient := db.NewClient(db.Driver(driver)) - - // TODO: use versioned migrations: https://entgo.io/docs/versioned-migrations - if err := dbClient.Schema.Create(context.Background()); err != nil { - return nil, fmt.Errorf("failed to migrate credit db: %w", err) - } - - return &Clients{ - Driver: driver, - Client: dbClient, - }, nil -} diff --git a/internal/cmd/streaming/connector.go b/internal/cmd/streaming/connector.go deleted file mode 100644 index bb6247510..000000000 --- a/internal/cmd/streaming/connector.go +++ /dev/null @@ -1,28 +0,0 @@ -package streaming - -import ( - "fmt" - "log/slog" - - "github.com/ClickHouse/clickhouse-go/v2" - - "github.com/openmeterio/openmeter/config" - "github.com/openmeterio/openmeter/internal/meter" - "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" -) - -func GetStreaming(config config.Configuration, clickHouseClient clickhouse.Conn, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) { - streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ - Logger: logger, - ClickHouse: clickHouseClient, - Database: config.Aggregation.ClickHouse.Database, - Meters: meterRepository, - CreateOrReplaceMeter: config.Aggregation.CreateOrReplaceMeter, - PopulateMeter: config.Aggregation.PopulateMeter, - }) - if err != nil { - return nil, fmt.Errorf("init clickhouse streaming: %w", err) - } - - return streamingConnector, nil -} diff --git a/internal/common/entitlement/connectors.go b/internal/common/entitlement/connectors.go deleted file mode 100644 index 19c9a65db..000000000 --- a/internal/common/entitlement/connectors.go +++ /dev/null @@ -1,96 +0,0 @@ -package entitlement - -import ( - "log/slog" - "time" - - "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/internal/meter" - "github.com/openmeterio/openmeter/openmeter/credit" - creditpgadapter "github.com/openmeterio/openmeter/openmeter/credit/postgresadapter" - "github.com/openmeterio/openmeter/openmeter/entitlement" - booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" - meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" - entitlementpgadapter "github.com/openmeterio/openmeter/openmeter/entitlement/postgresadapter" - staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" - "github.com/openmeterio/openmeter/openmeter/event/publisher" - "github.com/openmeterio/openmeter/openmeter/productcatalog" - productcatalogpgadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/postgresadapter" - "github.com/openmeterio/openmeter/openmeter/streaming" -) - -type Connectors struct { - Feature productcatalog.FeatureConnector - EntitlementOwner credit.OwnerConnector - CreditBalance credit.BalanceConnector - Grant credit.GrantConnector - MeteredEntitlement meteredentitlement.Connector - Entitlement entitlement.EntitlementConnector -} - -type ConnectorsOptions struct { - DatabaseClient *db.Client - StreamingConnector streaming.Connector - Logger *slog.Logger - MeterRepository meter.Repository - Publisher publisher.TopicPublisher -} - -func GetConnectors(opts ConnectorsOptions) *Connectors { - // Initialize database adapters - featureDBAdapter := productcatalogpgadapter.NewPostgresFeatureDBAdapter(opts.DatabaseClient, opts.Logger) - entitlementDBAdapter := entitlementpgadapter.NewPostgresEntitlementDBAdapter(opts.DatabaseClient) - usageResetDBAdapter := entitlementpgadapter.NewPostgresUsageResetDBAdapter(opts.DatabaseClient) - grantDBAdapter := creditpgadapter.NewPostgresGrantDBAdapter(opts.DatabaseClient) - balanceSnashotDBAdapter := creditpgadapter.NewPostgresBalanceSnapshotDBAdapter(opts.DatabaseClient) - - // Initialize connectors - featureConnector := productcatalog.NewFeatureConnector(featureDBAdapter, opts.MeterRepository) - entitlementOwnerConnector := meteredentitlement.NewEntitlementGrantOwnerAdapter( - featureDBAdapter, - entitlementDBAdapter, - usageResetDBAdapter, - opts.MeterRepository, - opts.Logger, - ) - creditBalanceConnector := credit.NewBalanceConnector( - grantDBAdapter, - balanceSnashotDBAdapter, - entitlementOwnerConnector, - opts.StreamingConnector, - opts.Logger, - ) - grantConnector := credit.NewGrantConnector( - entitlementOwnerConnector, - grantDBAdapter, - balanceSnashotDBAdapter, - time.Minute, - opts.Publisher, - ) - meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( - opts.StreamingConnector, - entitlementOwnerConnector, - creditBalanceConnector, - grantConnector, - entitlementDBAdapter, - opts.Publisher, - ) - entitlementConnector := entitlement.NewEntitlementConnector( - entitlementDBAdapter, - featureConnector, - opts.MeterRepository, - meteredEntitlementConnector, - staticentitlement.NewStaticEntitlementConnector(), - booleanentitlement.NewBooleanEntitlementConnector(), - opts.Publisher, - ) - - return &Connectors{ - Feature: featureConnector, - EntitlementOwner: entitlementOwnerConnector, - CreditBalance: creditBalanceConnector, - Grant: grantConnector, - MeteredEntitlement: meteredEntitlementConnector, - Entitlement: entitlementConnector, - } -} diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index abd93f318..86b05d64c 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -3,20 +3,43 @@ package balanceworker import ( "context" "encoding/json" + "fmt" "log/slog" "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/cloudevents/sdk-go/v2/event" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/openmeterio/openmeter/internal/credit" "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/entitlement/httpdriver" + meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" + "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/event/models" "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/productcatalog" "github.com/openmeterio/openmeter/internal/registry" + "github.com/openmeterio/openmeter/pkg/convert" + pkgmodels "github.com/openmeterio/openmeter/pkg/models" ) +const ( + defaultHighWatermarkCacheSize = 100_000 + + // defaultClockDrift specifies how much clock drift is allowed when calculating the current time between the worker nodes. + // with AWS, Google Cloud 1ms is guaranteed, this should work well for any NTP based setup. + defaultClockDrift = time.Millisecond +) + +type NamespacedID = pkgmodels.NamespacedID + +type SubjectIDResolver interface { + GetSubjectIDByKey(ctx context.Context, namespace, key string) (string, error) +} + type WorkerOptions struct { SystemEventsTopic string IngestEventsTopic string @@ -28,6 +51,8 @@ type WorkerOptions struct { Marshaler publisher.CloudEventMarshaler Entitlement *registry.Entitlement + // External connectors + SubjectIDResolver SubjectIDResolver Logger *slog.Logger } @@ -39,9 +64,17 @@ type WorkerPoisonQueueOptions struct { ThrottleCount int64 } +type highWatermarkCacheEntry struct { + HighWatermark time.Time + IsDeleted bool +} + type Worker struct { - opts WorkerOptions - router *message.Router + opts WorkerOptions + connectors *registry.Entitlement + router *message.Router + + highWatermarkCache *lru.Cache[string, highWatermarkCacheEntry] } func New(opts WorkerOptions) (*Worker, error) { @@ -50,8 +83,16 @@ func New(opts WorkerOptions) (*Worker, error) { return nil, err } + highWatermarkCache, err := lru.New[string, highWatermarkCacheEntry](defaultHighWatermarkCacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create high watermark cache: %w", err) + } + worker := &Worker{ - opts: opts, + opts: opts, + router: router, + connectors: opts.Entitlement, + highWatermarkCache: highWatermarkCache, } router.AddHandler( @@ -102,10 +143,7 @@ func New(opts WorkerOptions) (*Worker, error) { ) } - return &Worker{ - opts: opts, - router: router, - }, nil + return worker, nil } func (w *Worker) Run(ctx context.Context) error { @@ -113,7 +151,11 @@ func (w *Worker) Run(ctx context.Context) error { } func (w *Worker) Close() error { - return w.router.Close() + if err := w.router.Close(); err != nil { + return err + } + + return nil } func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, error) { @@ -126,24 +168,204 @@ func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, er } switch ceType { + // Entitlement events case entitlement.EntitlementCreatedEvent{}.Spec().Type(): event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementCreatedEvent](msg.Payload) if err != nil { - w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) - return nil, err + return nil, fmt.Errorf("failed to parse entitlement created event: %w", err) } - return w.handleEntitlementCreatedEvent(event.Event, event.Payload) - } - // TODO[final-implementation]: use w.opts.Marshaler to create a new message + return w.handleUpdateEvent( + msg.Context(), + NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.ID}, + spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.ID), + ) + case entitlement.EntitlementDeletedEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementDeletedEvent](msg.Payload) + if err != nil { + return nil, fmt.Errorf("failed to parse entitlement deleted event: %w", err) + } + return w.handleEntitlementDeleteEvent(msg.Context(), event.Payload) + // Grant events + case credit.GrantCreatedEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[credit.GrantCreatedEvent](msg.Payload) + if err != nil { + return nil, fmt.Errorf("failed to parse grant created event: %w", err) + } + + return w.handleUpdateEvent( + msg.Context(), + NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, + spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), + ) + case credit.GrantVoidedEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[credit.GrantVoidedEvent](msg.Payload) + if err != nil { + return nil, fmt.Errorf("failed to parse grant voided event: %w", err) + } + + return w.handleUpdateEvent( + msg.Context(), + NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, + spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), + ) + // Metered entitlement events + case meteredentitlement.ResetEntitlementEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload) + if err != nil { + return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) + } + + return w.handleUpdateEvent( + msg.Context(), + NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, + spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), + ) + } return nil, nil } -func (w *Worker) handleEntitlementCreatedEvent(_ event.Event, payload entitlement.EntitlementCreatedEvent) ([]*message.Message, error) { - w.opts.Logger.Info("handling entitlement created event", slog.String("entitlement_id", payload.Entitlement.ID)) +func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent) ([]*message.Message, error) { + namespace := delEvent.Namespace.ID - return nil, nil + feature, err := w.connectors.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue) + if err != nil { + return nil, fmt.Errorf("failed to get feature: %w", err) + } + + subjectID := "" + if w.opts.SubjectIDResolver != nil { + subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, namespace, delEvent.SubjectKey) + if err != nil { + return nil, fmt.Errorf("failed to get subject ID: %w", err) + } + } + + calculationTime := w.getCalculationTime() + + event, err := spec.NewCloudEvent( + spec.EventSpec{ + Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID), + Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey), + }, + snapshot.EntitlementBalanceSnapshotEvent{ + Entitlement: delEvent.Entitlement, + Namespace: models.NamespaceID{ + ID: namespace, + }, + Subject: models.SubjectKeyAndID{ + Key: delEvent.SubjectKey, + ID: subjectID, + }, + Feature: *feature, + Operation: snapshot.BalanceOperationDelete, + + CalculatedAt: convert.ToPointer(calculationTime), + + CurrentUsagePeriod: delEvent.CurrentUsagePeriod, + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to create cloud event: %w", err) + } + + wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + if err != nil { + return nil, fmt.Errorf("failed to marshal cloud event: %w", err) + } + + _ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{ + HighWatermark: calculationTime, + IsDeleted: true, + }) + + return []*message.Message{wmMessage}, nil +} + +func (w *Worker) handleUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) ([]*message.Message, error) { + calculatedAt := w.getCalculationTime() + + if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok { + if entry.HighWatermark.After(calculatedAt) || entry.IsDeleted { + return nil, nil + } + } + + wmMessage, err := w.createEntitlementUpdateSnapshotEvent(ctx, entitlementID, source, calculatedAt) + if err != nil { + return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err) + } + + _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ + HighWatermark: calculatedAt, + }) + + return []*message.Message{wmMessage}, nil +} + +func (w *Worker) createEntitlementUpdateSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (*message.Message, error) { + entitlement, err := w.connectors.Entitlement.GetEntitlement(ctx, entitlementID.Namespace, entitlementID.ID) + if err != nil { + return nil, fmt.Errorf("failed to get entitlement: %w", err) + } + + feature, err := w.connectors.Feature.GetFeature(ctx, entitlementID.Namespace, entitlement.FeatureID, productcatalog.IncludeArchivedFeatureTrue) + if err != nil { + return nil, fmt.Errorf("failed to get feature: %w", err) + } + + value, err := w.connectors.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlement.SubjectKey, entitlement.ID, calculatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get entitlement value: %w", err) + } + + mappedValues, err := httpdriver.MapEntitlementValueToAPI(value) + if err != nil { + return nil, fmt.Errorf("failed to map entitlement value: %w", err) + } + + subjectID := "" + if w.opts.SubjectIDResolver != nil { + subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, entitlementID.Namespace, entitlementID.ID) + if err != nil { + return nil, fmt.Errorf("failed to get subject ID: %w", err) + } + } + + event, err := spec.NewCloudEvent( + spec.EventSpec{ + Source: source, + Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey), + }, + snapshot.EntitlementBalanceSnapshotEvent{ + Entitlement: *entitlement, + Namespace: models.NamespaceID{ + ID: entitlementID.Namespace, + }, + Subject: models.SubjectKeyAndID{ + Key: entitlement.SubjectKey, + ID: subjectID, + }, + Feature: *feature, + Operation: snapshot.BalanceOperationUpdate, + + CalculatedAt: &calculatedAt, + + Balance: convert.ToPointer((snapshot.EntitlementValue)(mappedValues)), + CurrentUsagePeriod: entitlement.CurrentUsagePeriod, + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to create cloud event: %w", err) + } + + wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + if err != nil { + return nil, fmt.Errorf("failed to marshal cloud event: %w", err) + } + + return wmMessage, nil } func (w *Worker) messageToLogFields(msg *message.Message) []any { @@ -159,3 +381,7 @@ func (w *Worker) messageToLogFields(msg *message.Message) []any { out = append(out, slog.String("message_metadata", string(meta))) return out } + +func (*Worker) getCalculationTime() time.Time { + return time.Now().Add(-defaultClockDrift).UTC() +} From 609aaf109cd17f34e5c6ffad035d523c6e9a0da7 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 30 Jul 2024 12:59:35 +0200 Subject: [PATCH 4/7] feat: add support for snapshot calculations Supported events are for entitlements and ingestion events. This is a naive implementation in the sense that we are not caching anything. We add a basic highwatermark cache to prevent recalculations multiple times. This will be more important when the ingest part is ready. --- .../balanceworker/entitlementhandler.go | 159 ++++++++++++++ .../balanceworker/ingesthandler.go | 69 ++++++ internal/entitlement/balanceworker/worker.go | 201 +++--------------- 3 files changed, 262 insertions(+), 167 deletions(-) create mode 100644 internal/entitlement/balanceworker/entitlementhandler.go create mode 100644 internal/entitlement/balanceworker/ingesthandler.go diff --git a/internal/entitlement/balanceworker/entitlementhandler.go b/internal/entitlement/balanceworker/entitlementhandler.go new file mode 100644 index 000000000..fb15b7117 --- /dev/null +++ b/internal/entitlement/balanceworker/entitlementhandler.go @@ -0,0 +1,159 @@ +package balanceworker + +import ( + "context" + "fmt" + "time" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/entitlement/httpdriver" + "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/event/models" + "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/pkg/convert" +) + +func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent) ([]*message.Message, error) { + namespace := delEvent.Namespace.ID + + feature, err := w.connectors.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue) + if err != nil { + return nil, fmt.Errorf("failed to get feature: %w", err) + } + + subjectID := "" + if w.opts.SubjectIDResolver != nil { + subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, namespace, delEvent.SubjectKey) + if err != nil { + return nil, fmt.Errorf("failed to get subject ID: %w", err) + } + } + + calculationTime := w.getCalculationTime() + + event, err := spec.NewCloudEvent( + spec.EventSpec{ + Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID), + Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey), + }, + snapshot.EntitlementBalanceSnapshotEvent{ + Entitlement: delEvent.Entitlement, + Namespace: models.NamespaceID{ + ID: namespace, + }, + Subject: models.SubjectKeyAndID{ + Key: delEvent.SubjectKey, + ID: subjectID, + }, + Feature: *feature, + Operation: snapshot.BalanceOperationDelete, + + CalculatedAt: convert.ToPointer(calculationTime), + + CurrentUsagePeriod: delEvent.CurrentUsagePeriod, + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to create cloud event: %w", err) + } + + wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + if err != nil { + return nil, fmt.Errorf("failed to marshal cloud event: %w", err) + } + + _ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{ + HighWatermark: calculationTime, + IsDeleted: true, + }) + + return []*message.Message{wmMessage}, nil +} + +func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) ([]*message.Message, error) { + calculatedAt := w.getCalculationTime() + + if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok { + if entry.HighWatermark.After(calculatedAt) || entry.IsDeleted { + return nil, nil + } + } + + wmMessage, err := w.createSnapshotEvent(ctx, entitlementID, source, calculatedAt) + if err != nil { + return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err) + } + + _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ + HighWatermark: calculatedAt, + }) + + return []*message.Message{wmMessage}, nil +} + +func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (*message.Message, error) { + entitlement, err := w.connectors.Entitlement.GetEntitlement(ctx, entitlementID.Namespace, entitlementID.ID) + if err != nil { + return nil, fmt.Errorf("failed to get entitlement: %w", err) + } + + feature, err := w.connectors.Feature.GetFeature(ctx, entitlementID.Namespace, entitlement.FeatureID, productcatalog.IncludeArchivedFeatureTrue) + if err != nil { + return nil, fmt.Errorf("failed to get feature: %w", err) + } + + value, err := w.connectors.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlement.SubjectKey, entitlement.ID, calculatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get entitlement value: %w", err) + } + + mappedValues, err := httpdriver.MapEntitlementValueToAPI(value) + if err != nil { + return nil, fmt.Errorf("failed to map entitlement value: %w", err) + } + + subjectID := "" + if w.opts.SubjectIDResolver != nil { + subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, entitlementID.Namespace, entitlementID.ID) + if err != nil { + return nil, fmt.Errorf("failed to get subject ID: %w", err) + } + } + + event, err := spec.NewCloudEvent( + spec.EventSpec{ + Source: source, + Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey), + }, + snapshot.EntitlementBalanceSnapshotEvent{ + Entitlement: *entitlement, + Namespace: models.NamespaceID{ + ID: entitlementID.Namespace, + }, + Subject: models.SubjectKeyAndID{ + Key: entitlement.SubjectKey, + ID: subjectID, + }, + Feature: *feature, + Operation: snapshot.BalanceOperationUpdate, + + CalculatedAt: &calculatedAt, + + Balance: convert.ToPointer((snapshot.EntitlementValue)(mappedValues)), + CurrentUsagePeriod: entitlement.CurrentUsagePeriod, + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to create cloud event: %w", err) + } + + wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + if err != nil { + return nil, fmt.Errorf("failed to marshal cloud event: %w", err) + } + + return wmMessage, nil +} diff --git a/internal/entitlement/balanceworker/ingesthandler.go b/internal/entitlement/balanceworker/ingesthandler.go new file mode 100644 index 000000000..7dba1e6a5 --- /dev/null +++ b/internal/entitlement/balanceworker/ingesthandler.go @@ -0,0 +1,69 @@ +package balanceworker + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" +) + +func (w *Worker) handleIngestEvent(ctx context.Context, event ingestnotification.IngestEvent) ([]*message.Message, error) { + affectedEntitlements, err := w.GetEntitlementsAffectedByMeterSubject(ctx, event.Namespace.ID, event.MeterSlugs, event.SubjectKey) + if err != nil { + return nil, err + } + + result := make([]*message.Message, 0, len(affectedEntitlements)) + for _, entitlement := range affectedEntitlements { + messages, err := w.handleEntitlementUpdateEvent( + ctx, + entitlement, + spec.ComposeResourcePath(entitlement.Namespace, spec.EntityEvent), + ) + if err != nil { + return nil, err + } + + result = append(result, messages...) + } + + return result, nil +} + +func (w *Worker) GetEntitlementsAffectedByMeterSubject(ctx context.Context, namespace string, meterSlugs []string, subject string) ([]NamespacedID, error) { + featuresByMeter, err := w.connectors.Feature.ListFeatures(ctx, productcatalog.ListFeaturesParams{ + Namespace: namespace, + MeterSlugs: meterSlugs, + }) + if err != nil { + return nil, err + } + + featureIDs := make([]string, 0, len(featuresByMeter.Items)) + for _, feature := range featuresByMeter.Items { + featureIDs = append(featureIDs, feature.ID) + } + + entitlements, err := w.connectors.Entitlement.ListEntitlements(ctx, entitlement.ListEntitlementsParams{ + Namespaces: []string{namespace}, + SubjectKeys: []string{subject}, + FeatureIDs: featureIDs, + }) + if err != nil { + return nil, err + } + + entitlementIDs := make([]NamespacedID, 0, len(entitlements.Items)) + for _, entitlement := range entitlements.Items { + entitlementIDs = append(entitlementIDs, NamespacedID{ + ID: entitlement.ID, + Namespace: entitlement.Namespace, + }) + } + + return entitlementIDs, nil +} diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index 86b05d64c..a88a15373 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -14,15 +14,10 @@ import ( "github.com/openmeterio/openmeter/internal/credit" "github.com/openmeterio/openmeter/internal/entitlement" - "github.com/openmeterio/openmeter/internal/entitlement/httpdriver" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" - "github.com/openmeterio/openmeter/internal/entitlement/snapshot" - "github.com/openmeterio/openmeter/internal/event/models" "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/event/spec" - "github.com/openmeterio/openmeter/internal/productcatalog" "github.com/openmeterio/openmeter/internal/registry" - "github.com/openmeterio/openmeter/pkg/convert" pkgmodels "github.com/openmeterio/openmeter/pkg/models" ) @@ -70,6 +65,8 @@ type highWatermarkCacheEntry struct { } type Worker struct { + ctx context.Context + ctxCancel context.CancelFunc opts WorkerOptions connectors *registry.Entitlement router *message.Router @@ -101,7 +98,16 @@ func New(opts WorkerOptions) (*Worker, error) { opts.Subscriber, opts.TargetTopic, opts.Publisher, - worker.handleSystemEvent, + worker.handleEvent, + ) + + router.AddHandler( + "balance_worker_ingest_events", + opts.IngestEventsTopic, + opts.Subscriber, + opts.TargetTopic, + opts.Publisher, + worker.handleEvent, ) router.AddMiddleware( @@ -126,7 +132,7 @@ func New(opts WorkerOptions) (*Worker, error) { poisionQueue, ) - poisionQueueProcessor := worker.handleSystemEvent + poisionQueueProcessor := worker.handleEvent if opts.PoisonQueue.Throttle { poisionQueueProcessor = middleware.NewThrottle( opts.PoisonQueue.ThrottleCount, @@ -147,6 +153,7 @@ func New(opts WorkerOptions) (*Worker, error) { } func (w *Worker) Run(ctx context.Context) error { + w.ctx, w.ctxCancel = context.WithCancel(ctx) return w.router.Run(ctx) } @@ -155,10 +162,11 @@ func (w *Worker) Close() error { return err } + w.ctxCancel() return nil } -func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, error) { +func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { w.opts.Logger.Debug("received system event", w.messageToLogFields(msg)...) ceType, found := msg.Metadata[publisher.CloudEventsHeaderType] @@ -172,21 +180,11 @@ func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, er case entitlement.EntitlementCreatedEvent{}.Spec().Type(): event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementCreatedEvent](msg.Payload) if err != nil { - return nil, fmt.Errorf("failed to parse entitlement created event: %w", err) - } - - return w.handleUpdateEvent( - msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.ID}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.ID), - ) - case entitlement.EntitlementDeletedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementDeletedEvent](msg.Payload) - if err != nil { - return nil, fmt.Errorf("failed to parse entitlement deleted event: %w", err) + w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) + return nil, err } - return w.handleEntitlementDeleteEvent(msg.Context(), event.Payload) + return w.handleEntitlementDeleteEvent(w.ctx, event.Payload) // Grant events case credit.GrantCreatedEvent{}.Spec().Type(): event, err := spec.ParseCloudEventFromBytes[credit.GrantCreatedEvent](msg.Payload) @@ -194,8 +192,8 @@ func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, er return nil, fmt.Errorf("failed to parse grant created event: %w", err) } - return w.handleUpdateEvent( - msg.Context(), + return w.handleEntitlementUpdateEvent( + w.ctx, NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), ) @@ -205,8 +203,8 @@ func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, er return nil, fmt.Errorf("failed to parse grant voided event: %w", err) } - return w.handleUpdateEvent( - msg.Context(), + return w.handleEntitlementUpdateEvent( + w.ctx, NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), ) @@ -217,155 +215,24 @@ func (w *Worker) handleSystemEvent(msg *message.Message) ([]*message.Message, er return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) } - return w.handleUpdateEvent( - msg.Context(), + return w.handleEntitlementUpdateEvent( + w.ctx, NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), ) - } - return nil, nil -} - -func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent) ([]*message.Message, error) { - namespace := delEvent.Namespace.ID - - feature, err := w.connectors.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue) - if err != nil { - return nil, fmt.Errorf("failed to get feature: %w", err) - } - - subjectID := "" - if w.opts.SubjectIDResolver != nil { - subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, namespace, delEvent.SubjectKey) - if err != nil { - return nil, fmt.Errorf("failed to get subject ID: %w", err) - } - } - - calculationTime := w.getCalculationTime() - - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID), - Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey), - }, - snapshot.EntitlementBalanceSnapshotEvent{ - Entitlement: delEvent.Entitlement, - Namespace: models.NamespaceID{ - ID: namespace, - }, - Subject: models.SubjectKeyAndID{ - Key: delEvent.SubjectKey, - ID: subjectID, - }, - Feature: *feature, - Operation: snapshot.BalanceOperationDelete, - - CalculatedAt: convert.ToPointer(calculationTime), - - CurrentUsagePeriod: delEvent.CurrentUsagePeriod, - }, - ) - if err != nil { - return nil, fmt.Errorf("failed to create cloud event: %w", err) - } - - wmMessage, err := w.opts.Marshaler.MarshalEvent(event) - if err != nil { - return nil, fmt.Errorf("failed to marshal cloud event: %w", err) - } - - _ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{ - HighWatermark: calculationTime, - IsDeleted: true, - }) - - return []*message.Message{wmMessage}, nil -} - -func (w *Worker) handleUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) ([]*message.Message, error) { - calculatedAt := w.getCalculationTime() - - if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok { - if entry.HighWatermark.After(calculatedAt) || entry.IsDeleted { - return nil, nil - } - } - - wmMessage, err := w.createEntitlementUpdateSnapshotEvent(ctx, entitlementID, source, calculatedAt) - if err != nil { - return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err) - } - - _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ - HighWatermark: calculatedAt, - }) - - return []*message.Message{wmMessage}, nil -} - -func (w *Worker) createEntitlementUpdateSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (*message.Message, error) { - entitlement, err := w.connectors.Entitlement.GetEntitlement(ctx, entitlementID.Namespace, entitlementID.ID) - if err != nil { - return nil, fmt.Errorf("failed to get entitlement: %w", err) - } - - feature, err := w.connectors.Feature.GetFeature(ctx, entitlementID.Namespace, entitlement.FeatureID, productcatalog.IncludeArchivedFeatureTrue) - if err != nil { - return nil, fmt.Errorf("failed to get feature: %w", err) - } - - value, err := w.connectors.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlement.SubjectKey, entitlement.ID, calculatedAt) - if err != nil { - return nil, fmt.Errorf("failed to get entitlement value: %w", err) - } - - mappedValues, err := httpdriver.MapEntitlementValueToAPI(value) - if err != nil { - return nil, fmt.Errorf("failed to map entitlement value: %w", err) - } - - subjectID := "" - if w.opts.SubjectIDResolver != nil { - subjectID, err = w.opts.SubjectIDResolver.GetSubjectIDByKey(ctx, entitlementID.Namespace, entitlementID.ID) + // Metered entitlement events + case meteredentitlement.ResetEntitlementEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload) if err != nil { - return nil, fmt.Errorf("failed to get subject ID: %w", err) + return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) } - } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: source, - Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey), - }, - snapshot.EntitlementBalanceSnapshotEvent{ - Entitlement: *entitlement, - Namespace: models.NamespaceID{ - ID: entitlementID.Namespace, - }, - Subject: models.SubjectKeyAndID{ - Key: entitlement.SubjectKey, - ID: subjectID, - }, - Feature: *feature, - Operation: snapshot.BalanceOperationUpdate, - - CalculatedAt: &calculatedAt, - - Balance: convert.ToPointer((snapshot.EntitlementValue)(mappedValues)), - CurrentUsagePeriod: entitlement.CurrentUsagePeriod, - }, - ) - if err != nil { - return nil, fmt.Errorf("failed to create cloud event: %w", err) - } - - wmMessage, err := w.opts.Marshaler.MarshalEvent(event) - if err != nil { - return nil, fmt.Errorf("failed to marshal cloud event: %w", err) + return w.handleUpdateEvent( + msg.Context(), + NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, + spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), + ) } - - return wmMessage, nil } func (w *Worker) messageToLogFields(msg *message.Message) []any { From 6fda2f5667a59b2630082eb32b4a231d88d95be6 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 1 Aug 2024 09:29:30 +0200 Subject: [PATCH 5/7] fix: add ingest wiring --- .../balanceworker/entitlementhandler.go | 8 ++--- internal/entitlement/balanceworker/worker.go | 32 ++++++++++++++++--- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/internal/entitlement/balanceworker/entitlementhandler.go b/internal/entitlement/balanceworker/entitlementhandler.go index fb15b7117..f2a7c2656 100644 --- a/internal/entitlement/balanceworker/entitlementhandler.go +++ b/internal/entitlement/balanceworker/entitlementhandler.go @@ -32,7 +32,7 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti } } - calculationTime := w.getCalculationTime() + calculationTime := time.Now() event, err := spec.NewCloudEvent( spec.EventSpec{ @@ -66,7 +66,7 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti } _ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{ - HighWatermark: calculationTime, + HighWatermark: calculationTime.Add(-defaultClockDrift), IsDeleted: true, }) @@ -74,7 +74,7 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti } func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) ([]*message.Message, error) { - calculatedAt := w.getCalculationTime() + calculatedAt := time.Now() if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok { if entry.HighWatermark.After(calculatedAt) || entry.IsDeleted { @@ -88,7 +88,7 @@ func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID } _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ - HighWatermark: calculatedAt, + HighWatermark: calculatedAt.Add(-defaultClockDrift), }) return []*message.Message{wmMessage}, nil diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index a88a15373..c3d900812 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -18,6 +18,7 @@ import ( "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/registry" + "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" pkgmodels "github.com/openmeterio/openmeter/pkg/models" ) @@ -183,8 +184,20 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) return nil, err } + return w.handleEntitlementUpdateEvent( + w.ctx, + NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.ID}, + spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.ID), + ) + case entitlement.EntitlementDeletedEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementDeletedEvent](msg.Payload) + if err != nil { + w.opts.Logger.Error("failed to parse entitlement deleted event", w.messageToLogFields(msg)...) + return nil, err + } return w.handleEntitlementDeleteEvent(w.ctx, event.Payload) + // Grant events case credit.GrantCreatedEvent{}.Spec().Type(): event, err := spec.ParseCloudEventFromBytes[credit.GrantCreatedEvent](msg.Payload) @@ -208,6 +221,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), ) + // Metered entitlement events case meteredentitlement.ResetEntitlementEvent{}.Spec().Type(): event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload) @@ -220,6 +234,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), ) + // Metered entitlement events case meteredentitlement.ResetEntitlementEvent{}.Spec().Type(): event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload) @@ -227,12 +242,23 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) } - return w.handleUpdateEvent( + return w.handleEntitlementUpdateEvent( msg.Context(), NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), ) + + // Ingest event + case ingestnotification.IngestEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[ingestnotification.IngestEvent](msg.Payload) + if err != nil { + return nil, fmt.Errorf("failed to parse ingest event: %w", err) + } + + return w.handleIngestEvent(w.ctx, event.Payload) } + + return nil, nil } func (w *Worker) messageToLogFields(msg *message.Message) []any { @@ -248,7 +274,3 @@ func (w *Worker) messageToLogFields(msg *message.Message) []any { out = append(out, slog.String("message_metadata", string(meta))) return out } - -func (*Worker) getCalculationTime() time.Time { - return time.Now().Add(-defaultClockDrift).UTC() -} From 00a71b314ff729959ccdf8a2c43995ed2f3f9725 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 1 Aug 2024 10:01:32 +0200 Subject: [PATCH 6/7] refactor: context handling --- cmd/balance-worker/main.go | 4 ++-- internal/entitlement/balanceworker/worker.go | 16 ++++++---------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index b5e38d993..409a53f6d 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -255,8 +255,8 @@ func main() { // Initialize worker workerOptions := balanceworker.WorkerOptions{ SystemEventsTopic: conf.Events.SystemEvents.Topic, - // TODO: IngestEventsTopic - Subscriber: wmSubscriber, + IngestEventsTopic: conf.Events.IngestEvents.Topic, + Subscriber: wmSubscriber, TargetTopic: conf.Events.SystemEvents.Topic, Publisher: publishers.watermillPublisher, diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index c3d900812..9c86fb3f3 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -66,8 +66,6 @@ type highWatermarkCacheEntry struct { } type Worker struct { - ctx context.Context - ctxCancel context.CancelFunc opts WorkerOptions connectors *registry.Entitlement router *message.Router @@ -154,7 +152,6 @@ func New(opts WorkerOptions) (*Worker, error) { } func (w *Worker) Run(ctx context.Context) error { - w.ctx, w.ctxCancel = context.WithCancel(ctx) return w.router.Run(ctx) } @@ -163,7 +160,6 @@ func (w *Worker) Close() error { return err } - w.ctxCancel() return nil } @@ -185,7 +181,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { return nil, err } return w.handleEntitlementUpdateEvent( - w.ctx, + msg.Context(), NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.ID}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.ID), ) @@ -196,7 +192,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { return nil, err } - return w.handleEntitlementDeleteEvent(w.ctx, event.Payload) + return w.handleEntitlementDeleteEvent(msg.Context(), event.Payload) // Grant events case credit.GrantCreatedEvent{}.Spec().Type(): @@ -206,7 +202,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { } return w.handleEntitlementUpdateEvent( - w.ctx, + msg.Context(), NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), ) @@ -217,7 +213,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { } return w.handleEntitlementUpdateEvent( - w.ctx, + msg.Context(), NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), ) @@ -230,7 +226,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { } return w.handleEntitlementUpdateEvent( - w.ctx, + msg.Context(), NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), ) @@ -255,7 +251,7 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { return nil, fmt.Errorf("failed to parse ingest event: %w", err) } - return w.handleIngestEvent(w.ctx, event.Payload) + return w.handleIngestEvent(msg.Context(), event.Payload) } return nil, nil From 3a5bc609472a4d6bb74beef30aecf895829329dc Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 1 Aug 2024 10:58:29 +0200 Subject: [PATCH 7/7] fix: adjust for new event names --- .../balanceworker/entitlementhandler.go | 4 ++-- .../balanceworker/ingesthandler.go | 2 +- internal/entitlement/balanceworker/worker.go | 21 ++++--------------- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/internal/entitlement/balanceworker/entitlementhandler.go b/internal/entitlement/balanceworker/entitlementhandler.go index f2a7c2656..6f5f11d35 100644 --- a/internal/entitlement/balanceworker/entitlementhandler.go +++ b/internal/entitlement/balanceworker/entitlementhandler.go @@ -39,7 +39,7 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID), Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey), }, - snapshot.EntitlementBalanceSnapshotEvent{ + snapshot.SnapshotEvent{ Entitlement: delEvent.Entitlement, Namespace: models.NamespaceID{ ID: namespace, @@ -128,7 +128,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac Source: source, Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey), }, - snapshot.EntitlementBalanceSnapshotEvent{ + snapshot.SnapshotEvent{ Entitlement: *entitlement, Namespace: models.NamespaceID{ ID: entitlementID.Namespace, diff --git a/internal/entitlement/balanceworker/ingesthandler.go b/internal/entitlement/balanceworker/ingesthandler.go index 7dba1e6a5..910f0ef06 100644 --- a/internal/entitlement/balanceworker/ingesthandler.go +++ b/internal/entitlement/balanceworker/ingesthandler.go @@ -11,7 +11,7 @@ import ( "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" ) -func (w *Worker) handleIngestEvent(ctx context.Context, event ingestnotification.IngestEvent) ([]*message.Message, error) { +func (w *Worker) handleIngestEvent(ctx context.Context, event ingestnotification.EventIngested) ([]*message.Message, error) { affectedEntitlements, err := w.GetEntitlementsAffectedByMeterSubject(ctx, event.Namespace.ID, event.MeterSlugs, event.SubjectKey) if err != nil { return nil, err diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index 9c86fb3f3..0bcdacebc 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -219,21 +219,8 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { ) // Metered entitlement events - case meteredentitlement.ResetEntitlementEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload) - if err != nil { - return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) - } - - return w.handleEntitlementUpdateEvent( - msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), - ) - - // Metered entitlement events - case meteredentitlement.ResetEntitlementEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[meteredentitlement.ResetEntitlementEvent](msg.Payload) + case meteredentitlement.EntitlementResetEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[meteredentitlement.EntitlementResetEvent](msg.Payload) if err != nil { return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) } @@ -245,8 +232,8 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { ) // Ingest event - case ingestnotification.IngestEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[ingestnotification.IngestEvent](msg.Payload) + case ingestnotification.EventIngested{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[ingestnotification.EventIngested](msg.Payload) if err != nil { return nil, fmt.Errorf("failed to parse ingest event: %w", err) }