Skip to content

Commit

Permalink
feat: boundary cache (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
scorix authored Dec 3, 2024
1 parent 85e80be commit f61effb
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 14 deletions.
52 changes: 52 additions & 0 deletions pkg/grib2/cache/boundary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cache

import (
"fmt"

"golang.org/x/sync/singleflight"
)

type boundary struct {
minLat float32
maxLat float32
minLon float32
maxLon float32

cache map[int]float32
datasource GridDataSource
sfg singleflight.Group
}

func NewBoundary(minLat, maxLat, minLon, maxLon float32, datasource GridDataSource) GridCache {
return &boundary{
minLat: minLat,
maxLat: maxLat,
minLon: minLon,
maxLon: maxLon,
datasource: datasource,
cache: make(map[int]float32),
}
}

func (b *boundary) ReadGridAt(grid int, lat, lon float32) (float32, error) {
if lat < b.minLat || lat > b.maxLat || lon < b.minLon || lon > b.maxLon {
return b.datasource.ReadGridAt(grid)
}

v, err, _ := b.sfg.Do(fmt.Sprintf("%d", grid), func() (interface{}, error) {
vFromCache, ok := b.cache[grid]
if !ok {
vFromSource, err := b.datasource.ReadGridAt(grid)
if err != nil {
return 0, err
}

b.cache[grid] = vFromSource
vFromCache = vFromSource
}

return vFromCache, nil
})

return v.(float32), err
}
26 changes: 26 additions & 0 deletions pkg/grib2/cache/boundary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cache_test

import (
"testing"

"github.com/scorix/grib-go/pkg/grib2/cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBoundary(t *testing.T) {
ds := &mockGridDataSource{gridValue: 100}
bc := cache.NewBoundary(0, 10, 0, 10, ds)

// first read should be from source
v, err := bc.ReadGridAt(1, 1, 1)
require.NoError(t, err)
assert.Equal(t, float32(100), v)
assert.Equal(t, 1, ds.readCount)

// second read should be cached
v, err = bc.ReadGridAt(1, 1, 1)
require.NoError(t, err)
assert.Equal(t, float32(100), v)
assert.Equal(t, 1, ds.readCount)
}
21 changes: 21 additions & 0 deletions pkg/grib2/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cache

type GridDataSource interface {
ReadGridAt(grid int) (float32, error)
}

type GridCache interface {
ReadGridAt(grid int, lat, lon float32) (float32, error)
}

type noCache struct {
datasource GridDataSource
}

func NewNoCache(datasource GridDataSource) GridCache {
return &noCache{datasource: datasource}
}

func (n *noCache) ReadGridAt(grid int, lat, lon float32) (float32, error) {
return n.datasource.ReadGridAt(grid)
}
36 changes: 36 additions & 0 deletions pkg/grib2/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cache_test

import (
"testing"

"github.com/scorix/grib-go/pkg/grib2/cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockGridDataSource struct {
gridValue float32
readCount int
}

func (m *mockGridDataSource) ReadGridAt(grid int) (float32, error) {
m.readCount++
return m.gridValue, nil
}

func TestNoCache(t *testing.T) {
ds := &mockGridDataSource{gridValue: 100}
nc := cache.NewNoCache(ds)

// first read should be from source
v, err := nc.ReadGridAt(1, 1, 1)
require.NoError(t, err)
assert.Equal(t, float32(100), v)
assert.Equal(t, 1, ds.readCount)

// second read should not be cached
v, err = nc.ReadGridAt(1, 1, 1)
require.NoError(t, err)
assert.Equal(t, float32(100), v)
assert.Equal(t, 2, ds.readCount)
}
47 changes: 33 additions & 14 deletions pkg/grib2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/scorix/grib-go/internal/pkg/bitio"
"github.com/scorix/grib-go/pkg/grib2/cache"
"github.com/scorix/grib-go/pkg/grib2/drt"
gridpoint "github.com/scorix/grib-go/pkg/grib2/drt/grid_point"
"github.com/scorix/grib-go/pkg/grib2/gdt"
Expand Down Expand Up @@ -238,44 +239,62 @@ type MessageReader interface {
}

type simplePackingMessageReader struct {
sp *gridpoint.SimplePacking
spr *gridpoint.SimplePackingReader
gdt gdt.Template
sp *gridpoint.SimplePacking
spr *gridpoint.SimplePackingReader
gdt gdt.Template
cache cache.GridCache
}

func NewSimplePackingMessageReaderFromMessage(r io.ReaderAt, m IndexedMessage) (MessageReader, error) {
func NewSimplePackingMessageReaderFromMessage(r io.ReaderAt, m IndexedMessage, opts ...SimplePackingMessageReaderOptions) (MessageReader, error) {
sp, ok := m.GetDataRepresentationTemplate().(*gridpoint.SimplePacking)
if !ok {
return nil, fmt.Errorf("unsupported data representation template: %T", m.GetDataRepresentationTemplate())
}

gdt := m.GetGridDefinitionTemplate()

return NewSimplePackingMessageReader(r, m.GetOffset(), m.GetSize(), m.GetDataOffset(), sp, gdt)
return NewSimplePackingMessageReader(r, m.GetOffset(), m.GetSize(), m.GetDataOffset(), sp, gdt, opts...)
}

func NewSimplePackingMessageReader(r io.ReaderAt, messageOffset int64, messageSize int64, dataOffset int64, sp *gridpoint.SimplePacking, gdt gdt.Template) (MessageReader, error) {
return &simplePackingMessageReader{
spr: gridpoint.NewSimplePackingReader(r, dataOffset, messageOffset+messageSize, sp),
sp: sp,
gdt: gdt,
}, nil
type SimplePackingMessageReaderOptions func(r *simplePackingMessageReader)

func WithBoundary(minLat, maxLat, minLon, maxLon float32) SimplePackingMessageReaderOptions {
return func(r *simplePackingMessageReader) {
r.cache = cache.NewBoundary(minLat, maxLat, minLon, maxLon, r.spr)
}
}

func NewSimplePackingMessageReader(r io.ReaderAt, messageOffset int64, messageSize int64, dataOffset int64, sp *gridpoint.SimplePacking, gdt gdt.Template, opts ...SimplePackingMessageReaderOptions) (MessageReader, error) {
spr := gridpoint.NewSimplePackingReader(r, dataOffset, messageOffset+messageSize, sp)

mr := &simplePackingMessageReader{
spr: spr,
sp: sp,
gdt: gdt,
cache: cache.NewNoCache(spr),
}

for _, opt := range opts {
opt(mr)
}

return mr, nil
}

func NewSimplePackingMessageReaderFromMessageIndex(r io.ReaderAt, mi *MessageIndex) (MessageReader, error) {
func NewSimplePackingMessageReaderFromMessageIndex(r io.ReaderAt, mi *MessageIndex, opts ...SimplePackingMessageReaderOptions) (MessageReader, error) {
sp, ok := mi.Packing.(*gridpoint.SimplePacking)
if !ok {
return nil, fmt.Errorf("unsupported packing: %T", mi.Packing)
}

return NewSimplePackingMessageReader(r, mi.Offset, mi.Size, mi.DataOffset, sp, mi.GridDefinition)
return NewSimplePackingMessageReader(r, mi.Offset, mi.Size, mi.DataOffset, sp, mi.GridDefinition, opts...)
}

func (r *simplePackingMessageReader) ReadLL(lat float32, lon float32) (float32, float32, float32, error) {
grid := r.gdt.GetGridIndex(lat, lon)
lat, lng := r.gdt.GetGridPoint(grid)

v, err := r.spr.ReadGridAt(grid)
v, err := r.cache.ReadGridAt(grid, lat, lng)
if err != nil {
return 0, 0, 0, fmt.Errorf("read grid at point %d (lat: %f, lon: %f): %w", grid, lat, lng, err)
}
Expand Down

0 comments on commit f61effb

Please sign in to comment.