Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Dec 18, 2024
1 parent b15cd40 commit e8d8e2c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
7 changes: 5 additions & 2 deletions internal/topo/node/cache/sync_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ func NewSyncCache(ctx api.StreamContext, cacheConf *conf.SinkConf) (*SyncCache,
writeBufferPage: newPage(cacheConf.BufferPageSize),
readBufferPage: newPage(cacheConf.BufferPageSize),
}
err := c.initStore(ctx)
return c, err
return c, nil
}

func (c *SyncCache) InitStore(ctx api.StreamContext) error {
return c.initStore(ctx)
}

func (c *SyncCache) SetupMeta(ctx api.StreamContext) {
Expand Down
6 changes: 6 additions & 0 deletions internal/topo/node/cache/sync_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestCache(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
// prepare data
tuples := make([]any, 15)
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -234,6 +236,7 @@ func TestCacheCase2(t *testing.T) {
ResendInterval: cast.DurationConf(10 * time.Millisecond),
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
// prepare data
tuples := make([]any, 15)
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -337,6 +340,7 @@ func TestCacheInit(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
// prepare data
tuples := make([]any, 10)
for i := 0; i < 10; i++ {
Expand All @@ -361,6 +365,7 @@ func TestCacheInit(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
r, _ := s.PopCache(ctx)
assert.Equal(t, 3, s.CacheLength, "cache length after pop")
assert.Equal(t, &xsql.RawTuple{
Expand All @@ -380,6 +385,7 @@ func TestCacheInit(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
r, _ = s.PopCache(ctx)
assert.Equal(t, 2, s.CacheLength, "cache length after pop")
assert.Equal(t, &xsql.RawTuple{
Expand Down
9 changes: 8 additions & 1 deletion internal/topo/node/cache_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error) {
infra.DrainError(ctx, fmt.Errorf("cache op should have only 1 output but got %+v", s.outputs), errCh)
}
s.cache.SetupMeta(ctx)
if err := s.cache.InitStore(ctx); err != nil {
infra.DrainError(ctx, fmt.Errorf("cache op init store error:%v", err), errCh)
return
}

Check warning on line 76 in internal/topo/node/cache_op.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/node/cache_op.go#L74-L76

Added lines #L74 - L76 were not covered by tests
s.prepareExec(ctx, errCh, "op")
go func() {
err := infra.SafeRun(func() error {
Expand Down Expand Up @@ -105,10 +109,13 @@ func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error) {
s.send()
s.span = nil
s.onProcessEnd(ctx)
l := int64(len(s.input) + s.cache.CacheLength)
a := int64(len(s.input))
b := s.cache.CacheLength
l := a + int64(b)
if s.currItem != nil {
l += 1
}
conf.Log.Printf("cache op buffer length %v, chan %v, cacheLength %v", l, a, b)
s.statManager.SetBufferLength(l)
case <-s.resendTimerCh:
ctx.GetLogger().Debugf("ticker is triggered")
Expand Down

0 comments on commit e8d8e2c

Please sign in to comment.