Skip to content

Commit

Permalink
FFM-11660 Add configurable size and clearing schedule to the seen tar…
Browse files Browse the repository at this point in the history
…gets map (#165)
  • Loading branch information
erdirowlands authored Sep 9, 2024
1 parent f2362b4 commit a9511ed
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .harness/ffgolangserversdk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pipeline:
dockerfile: ff-sdk-testgrid/go/Dockerfile
context: ff-sdk-testgrid/go
buildArgs:
SDK_VERSION: v0.1.24
SDK_VERSION: v0.1.25
BUILD_MODE: local
resources:
limits:
Expand Down
69 changes: 49 additions & 20 deletions analyticsservice/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
variationValueAttribute string = "featureValue"
targetAttribute string = "target"
sdkVersionAttribute string = "SDK_VERSION"
SdkVersion string = "0.1.24"
SdkVersion string = "0.1.25"
sdkTypeAttribute string = "SDK_TYPE"
sdkType string = "server"
sdkLanguageAttribute string = "SDK_LANGUAGE"
Expand All @@ -46,6 +46,12 @@ type SafeAnalyticsCache[K comparable, V any] interface {
iterate(func(K, V))
}

// SafeSeenTargetsCache extends SafeAnalyticsCache and adds behavior specific to seen targets
type SafeSeenTargetsCache[K comparable, V any] interface {
SafeAnalyticsCache[K, V]
isLimitExceeded() bool
}

type analyticsEvent struct {
target *evaluation.Target
featureConfig *rest.FeatureConfig
Expand All @@ -55,33 +61,35 @@ type analyticsEvent struct {

// AnalyticsService provides a way to cache and send analytics to the server
type AnalyticsService struct {
analyticsChan chan analyticsEvent
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeAnalyticsCache[string, bool]
logEvaluationLimitReached atomic.Bool
logTargetLimitReached atomic.Bool
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
environmentID string
analyticsChan chan analyticsEvent
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeSeenTargetsCache[string, bool]
logEvaluationLimitReached atomic.Bool
logTargetLimitReached atomic.Bool
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
environmentID string
seenTargetsClearingInterval time.Duration
}

// NewAnalyticsService creates and starts a analytics service to send data to the client
func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *AnalyticsService {
func NewAnalyticsService(timeout time.Duration, logger logger.Logger, seenTargetsMaxSize int, seenTargetsClearingSchedule time.Duration) *AnalyticsService {
serviceTimeout := timeout
if timeout < 60*time.Second {
serviceTimeout = 60 * time.Second
} else if timeout > 1*time.Hour {
serviceTimeout = 1 * time.Hour
}
as := AnalyticsService{
analyticsChan: make(chan analyticsEvent),
evaluationAnalytics: newSafeEvaluationAnalytics(),
targetAnalytics: newSafeTargetAnalytics(),
seenTargets: newSafeSeenTargets(),
timeout: serviceTimeout,
logger: logger,
analyticsChan: make(chan analyticsEvent),
evaluationAnalytics: newSafeEvaluationAnalytics(),
targetAnalytics: newSafeTargetAnalytics(),
seenTargets: newSafeSeenTargets(seenTargetsMaxSize),
timeout: serviceTimeout,
logger: logger,
seenTargetsClearingInterval: seenTargetsClearingSchedule,
}
go as.listener()

Expand All @@ -94,6 +102,7 @@ func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.Clie
as.metricsClient = client
as.environmentID = environmentID
go as.startTimer(ctx)
go as.startSeenTargetsClearingSchedule(ctx, as.seenTargetsClearingInterval)
}

func (as *AnalyticsService) startTimer(ctx context.Context) {
Expand All @@ -103,6 +112,7 @@ func (as *AnalyticsService) startTimer(ctx context.Context) {
timeStamp := time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
as.sendDataAndResetCache(ctx, timeStamp)
case <-ctx.Done():
close(as.analyticsChan)
as.logger.Infof("%s Metrics stopped", sdk_codes.MetricsStopped)
return
}
Expand Down Expand Up @@ -149,9 +159,12 @@ func (as *AnalyticsService) listener() {
}

// Check if target has been seen
_, seen := as.seenTargets.get(ad.target.Identifier)
if _, seen := as.seenTargets.get(ad.target.Identifier); seen {
continue
}

if seen {
// Check if seen targets limit has been hit
if as.seenTargets.isLimitExceeded() {
continue
}

Expand Down Expand Up @@ -314,6 +327,22 @@ func (as *AnalyticsService) processTargetMetrics(targetAnalytics SafeAnalyticsCa
return targetData
}

func (as *AnalyticsService) startSeenTargetsClearingSchedule(ctx context.Context, clearingInterval time.Duration) {
ticker := time.NewTicker(clearingInterval)

for {
select {
case <-ticker.C:
as.logger.Debugf("Clearing seen targets")
as.seenTargets.clear()

case <-ctx.Done():
ticker.Stop()
return
}
}
}

func getEvaluationAnalyticKey(event analyticsEvent) string {
return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget)
}
2 changes: 1 addition & 1 deletion analyticsservice/analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestListenerHandlesEventsCorrectly(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
service := NewAnalyticsService(1*time.Minute, noOpLogger)
service := NewAnalyticsService(1*time.Minute, noOpLogger, 10, time.Hour)
defer close(service.analyticsChan)

// Start the listener in a goroutine
Expand Down
89 changes: 86 additions & 3 deletions analyticsservice/safe_maps_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package analyticsservice

import (
"fmt"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -81,13 +82,95 @@ func TestSafeTargetAnalytics(t *testing.T) {
}

func TestSafeSeenTargets(t *testing.T) {
s := newSafeSeenTargets()
// Initialize with a small maxSize for testing
maxSize := 3
s := newSafeSeenTargets(maxSize).(SafeSeenTargetsCache[string, bool])

testData := map[string]bool{
"target1": true,
"target21": true,
"target3": true,
"target4": true,
}

testMapOperations[string, bool](t, s, testData)
// Insert items and ensure limit is not exceeded
for key, value := range testData {
s.set(key, value)
}

if s.isLimitExceeded() {
t.Errorf("Limit should not have been exceeded yet")
}

// Add one more item to exceed the limit
s.set("target4", true)

// Ensure limitExceeded is true after exceeding the limit
if !s.isLimitExceeded() {
t.Errorf("Limit should be exceeded after adding target4")
}

// Ensure that new items are not added once the limit is exceeded
s.set("target5", true)
if _, exists := s.get("target5"); exists {
t.Errorf("target5 should not have been added as the limit was exceeded")
}

// Clear the map and ensure limit is reset
s.clear()

if s.isLimitExceeded() {
t.Errorf("Limit should have been reset after clearing the map")
}

// Add items again after clearing
s.set("target6", true)
if _, exists := s.get("target6"); !exists {
t.Errorf("target6 should have been added after clearing the map")
}

// Concurrency test
t.Run("ConcurrencyTest", func(t *testing.T) {
var wg sync.WaitGroup
concurrencyLevel := 100

// Re-initialize the map for concurrency testing
s = newSafeSeenTargets(100).(SafeSeenTargetsCache[string, bool])

// Concurrently set keys
for i := 0; i < concurrencyLevel; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "target" + fmt.Sprint(i)
s.set(key, true)
}(i)
}

// Concurrently get keys
for i := 0; i < concurrencyLevel; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "target" + fmt.Sprint(i)
s.get(key)
}(i)
}

// Concurrently clear the map
for i := 0; i < concurrencyLevel/2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s.clear()
}()
}

wg.Wait()

// Ensure the map is cleared after the concurrency operations
if s.size() > 0 {
t.Errorf("Map size should be 0 after clearing, got %d", s.size())
}
})

}
21 changes: 18 additions & 3 deletions analyticsservice/safe_seen_targets_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ package analyticsservice

import (
"sync"
"sync/atomic"
)

type safeSeenTargets struct {
sync.RWMutex
data map[string]bool
data map[string]bool
maxSize int
limitExceeded atomic.Bool
}

func newSafeSeenTargets() SafeAnalyticsCache[string, bool] {
func newSafeSeenTargets(maxSize int) SafeSeenTargetsCache[string, bool] {
return &safeSeenTargets{
data: make(map[string]bool),
data: make(map[string]bool),
maxSize: maxSize,
}
}

func (s *safeSeenTargets) set(key string, seen bool) {
s.Lock()
defer s.Unlock()

if len(s.data) >= s.maxSize {
s.limitExceeded.Store(true)
return
}

s.data[key] = seen
}

Expand All @@ -44,6 +54,7 @@ func (s *safeSeenTargets) clear() {
s.Lock()
defer s.Unlock()
s.data = make(map[string]bool)
s.limitExceeded.Store(false)
}

func (s *safeSeenTargets) iterate(f func(string, bool)) {
Expand All @@ -53,3 +64,7 @@ func (s *safeSeenTargets) iterate(f func(string, bool)) {
f(key, value)
}
}

func (s *safeSeenTargets) isLimitExceeded() bool {
return s.limitExceeded.Load()
}
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
opt(config)
}

analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger)
analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger, config.seenTargetsMaxSize, config.seenTargetsClearInterval)

client := &CfClient{
sdkKey: sdkKey,
Expand Down
Loading

0 comments on commit a9511ed

Please sign in to comment.