From e8d8e2c719fdcb121b0605afccebb9cd45a41983 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 18 Dec 2024 10:54:04 +0800 Subject: [PATCH] fix Signed-off-by: Song Gao --- internal/topo/node/cache/sync_cache.go | 7 +++++-- internal/topo/node/cache/sync_cache_test.go | 6 ++++++ internal/topo/node/cache_op.go | 9 ++++++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/internal/topo/node/cache/sync_cache.go b/internal/topo/node/cache/sync_cache.go index 5c7f55d30d..a1d132f93b 100644 --- a/internal/topo/node/cache/sync_cache.go +++ b/internal/topo/node/cache/sync_cache.go @@ -136,8 +136,11 @@ func NewSyncCache(ctx api.StreamContext, cacheConf *conf.SinkConf) (*SyncCache, writeBufferPage: newPage(cacheConf.BufferPageSize), readBufferPage: newPage(cacheConf.BufferPageSize), } - err := c.initStore(ctx) - return c, err + return c, nil +} + +func (c *SyncCache) InitStore(ctx api.StreamContext) error { + return c.initStore(ctx) } func (c *SyncCache) SetupMeta(ctx api.StreamContext) { diff --git a/internal/topo/node/cache/sync_cache_test.go b/internal/topo/node/cache/sync_cache_test.go index b2f582f3be..bdebd6288e 100644 --- a/internal/topo/node/cache/sync_cache_test.go +++ b/internal/topo/node/cache/sync_cache_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" @@ -130,6 +131,7 @@ func TestCache(t *testing.T) { CleanCacheAtStop: false, }) assert.NoError(t, err) + require.NoError(t, s.InitStore(ctx)) // prepare data tuples := make([]any, 15) for i := 0; i < 15; i++ { @@ -234,6 +236,7 @@ func TestCacheCase2(t *testing.T) { ResendInterval: cast.DurationConf(10 * time.Millisecond), }) assert.NoError(t, err) + require.NoError(t, s.InitStore(ctx)) // prepare data tuples := make([]any, 15) for i := 0; i < 15; i++ { @@ -337,6 +340,7 @@ func TestCacheInit(t *testing.T) { CleanCacheAtStop: false, }) assert.NoError(t, err) + require.NoError(t, s.InitStore(ctx)) // prepare data tuples := make([]any, 10) for i := 0; i < 10; i++ { @@ -361,6 +365,7 @@ func TestCacheInit(t *testing.T) { CleanCacheAtStop: false, }) assert.NoError(t, err) + require.NoError(t, s.InitStore(ctx)) r, _ := s.PopCache(ctx) assert.Equal(t, 3, s.CacheLength, "cache length after pop") assert.Equal(t, &xsql.RawTuple{ @@ -380,6 +385,7 @@ func TestCacheInit(t *testing.T) { CleanCacheAtStop: false, }) assert.NoError(t, err) + require.NoError(t, s.InitStore(ctx)) r, _ = s.PopCache(ctx) assert.Equal(t, 2, s.CacheLength, "cache length after pop") assert.Equal(t, &xsql.RawTuple{ diff --git a/internal/topo/node/cache_op.go b/internal/topo/node/cache_op.go index 5850579453..99d7c5cb54 100644 --- a/internal/topo/node/cache_op.go +++ b/internal/topo/node/cache_op.go @@ -70,6 +70,10 @@ func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error) { infra.DrainError(ctx, fmt.Errorf("cache op should have only 1 output but got %+v", s.outputs), errCh) } s.cache.SetupMeta(ctx) + if err := s.cache.InitStore(ctx); err != nil { + infra.DrainError(ctx, fmt.Errorf("cache op init store error:%v", err), errCh) + return + } s.prepareExec(ctx, errCh, "op") go func() { err := infra.SafeRun(func() error { @@ -105,10 +109,13 @@ func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error) { s.send() s.span = nil s.onProcessEnd(ctx) - l := int64(len(s.input) + s.cache.CacheLength) + a := int64(len(s.input)) + b := s.cache.CacheLength + l := a + int64(b) if s.currItem != nil { l += 1 } + conf.Log.Printf("cache op buffer length %v, chan %v, cacheLength %v", l, a, b) s.statManager.SetBufferLength(l) case <-s.resendTimerCh: ctx.GetLogger().Debugf("ticker is triggered")