diff --git a/.gitignore b/.gitignore index 243d1a6..41b6dfe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /venv/ .vscode/ +pkg/testdata/**/gfs.* diff --git a/go.mod b/go.mod index 9dbafcf..521d68a 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.2 require ( github.com/icza/bitio v1.1.0 github.com/stretchr/testify v1.9.0 + golang.org/x/sync v0.8.0 ) require ( diff --git a/go.sum b/go.sum index 806b732..17cb896 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/grib2/drt/grid_point/simple.go b/pkg/grib2/drt/grid_point/simple.go index 98e9dd4..30703fb 100644 --- a/pkg/grib2/drt/grid_point/simple.go +++ b/pkg/grib2/drt/grid_point/simple.go @@ -40,7 +40,13 @@ func (sp *SimplePacking) ReadAllData(r datapacking.BitReader) ([]float64, error) scaleFunc = sp.ScaleFunc() ) - for { + if sp.Bits == 0 { + for range sp.numVals { + values = append(values, scaleFunc(0)) + } + } + + for sp.Bits > 0 { bitsVal, err := r.ReadBits(sp.Bits) if errors.Is(err, io.EOF) { break diff --git a/pkg/grib2/grib2_test.go b/pkg/grib2/grib2_test.go index 6dedfe7..9a8eb7e 100644 --- a/pkg/grib2/grib2_test.go +++ b/pkg/grib2/grib2_test.go @@ -1,6 +1,7 @@ package grib2_test import ( + "errors" "io" "os" "testing" @@ -15,6 +16,7 @@ import ( "github.com/scorix/grib-go/pkg/grib2/regulation" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func assertSection(t testing.TB, sec grib2.Section, number int, length int) { @@ -130,21 +132,21 @@ func TestGrib_ReadSection_SimplePacking(t *testing.T) { test: func(t *testing.T, sec grib2.Section) { assertSection(t, sec, 4, 34) assertSection4(t, sec, &pdt.Template0{ - ParameterCategory: 0, - ParameterNumber: 0, - TypeOfGeneratingProcess: 2, - BackgroundProcess: -1, - GeneratingProcessIdentifier: -1, - HoursAfterDataCutoff: -1, - MinutesAfterDataCutoff: -1, - IndicatorOfUnitForForecastTime: 1, - ForecastTime: 0, - TypeOfFirstFixedSurface: 1, - ScaleFactorOfFirstFixedSurface: -1, - ScaledValueOfFirstFixedSurface: -1, - TypeOfSecondFixedSurface: -1, - ScaleFactorOfSecondFixedSurface: -1, - ScaledValueOfSecondFixedSurface: -1, + ParameterCategory: 0, + ParameterNumber: 0, + TypeOfGeneratingProcess: 2, + BackgroundProcess: -1, + AnalysisOrForecastGeneratingProcessIdentified: -1, + HoursAfterDataCutoff: -1, + MinutesAfterDataCutoff: -1, + IndicatorOfUnitForForecastTime: 1, + ForecastTime: 0, + TypeOfFirstFixedSurface: 1, + ScaleFactorOfFirstFixedSurface: -1, + ScaledValueOfFirstFixedSurface: -1, + TypeOfSecondFixedSurface: -1, + ScaleFactorOfSecondFixedSurface: -1, + ScaledValueOfSecondFixedSurface: -1, }) }, }, @@ -256,21 +258,21 @@ func TestGrib_ReadSection_ComplexPacking(t *testing.T) { test: func(t *testing.T, sec grib2.Section) { assertSection(t, sec, 4, 34) assertSection4(t, sec, &pdt.Template0{ - ParameterCategory: 2, - ParameterNumber: 2, - TypeOfGeneratingProcess: 2, - BackgroundProcess: 0, - GeneratingProcessIdentifier: 96, - HoursAfterDataCutoff: 0, - MinutesAfterDataCutoff: 0, - IndicatorOfUnitForForecastTime: 1, - ForecastTime: 6, - TypeOfFirstFixedSurface: 103, - ScaleFactorOfFirstFixedSurface: 0, - ScaledValueOfFirstFixedSurface: 10, - TypeOfSecondFixedSurface: -1, - ScaleFactorOfSecondFixedSurface: 0, - ScaledValueOfSecondFixedSurface: 0, + ParameterCategory: 2, + ParameterNumber: 2, + TypeOfGeneratingProcess: 2, + BackgroundProcess: 0, + AnalysisOrForecastGeneratingProcessIdentified: 96, + HoursAfterDataCutoff: 0, + MinutesAfterDataCutoff: 0, + IndicatorOfUnitForForecastTime: 1, + ForecastTime: 6, + TypeOfFirstFixedSurface: 103, + ScaleFactorOfFirstFixedSurface: 0, + ScaledValueOfFirstFixedSurface: 10, + TypeOfSecondFixedSurface: -1, + ScaleFactorOfSecondFixedSurface: 0, + ScaledValueOfSecondFixedSurface: 0, }) }, }, @@ -392,21 +394,21 @@ func TestGrib_ReadSection_ComplexPackingAndSpatialDifferencing(t *testing.T) { test: func(t *testing.T, sec grib2.Section) { assertSection(t, sec, 4, 34) assertSection4(t, sec, &pdt.Template0{ - ParameterCategory: 3, - ParameterNumber: 196, - TypeOfGeneratingProcess: 2, - BackgroundProcess: 0, - GeneratingProcessIdentifier: 96, - HoursAfterDataCutoff: 0, - MinutesAfterDataCutoff: 0, - IndicatorOfUnitForForecastTime: 1, - ForecastTime: 44, - TypeOfFirstFixedSurface: 1, - ScaleFactorOfFirstFixedSurface: 0, - ScaledValueOfFirstFixedSurface: 0, - TypeOfSecondFixedSurface: -1, - ScaleFactorOfSecondFixedSurface: 0, - ScaledValueOfSecondFixedSurface: 0, + ParameterCategory: 3, + ParameterNumber: 196, + TypeOfGeneratingProcess: 2, + BackgroundProcess: 0, + AnalysisOrForecastGeneratingProcessIdentified: 96, + HoursAfterDataCutoff: 0, + MinutesAfterDataCutoff: 0, + IndicatorOfUnitForForecastTime: 1, + ForecastTime: 44, + TypeOfFirstFixedSurface: 1, + ScaleFactorOfFirstFixedSurface: 0, + ScaledValueOfFirstFixedSurface: 0, + TypeOfSecondFixedSurface: -1, + ScaleFactorOfSecondFixedSurface: 0, + ScaledValueOfSecondFixedSurface: 0, }) }, }, @@ -651,6 +653,7 @@ func TestGrib2_ReadMessage(t *testing.T) { assert.Equal(t, 196, msg.GetParameterNumber()) assert.Equal(t, "2024-08-20T12:00:00Z", msg.GetTimestamp(time.UTC).Format(time.RFC3339)) assert.Equal(t, "2024-08-22T08:00:00Z", msg.GetForecastTime(time.UTC).Format(time.RFC3339)) + assert.Equal(t, 0, msg.GetLevel()) } { @@ -659,3 +662,46 @@ func TestGrib2_ReadMessage(t *testing.T) { assert.Nil(t, msg) } } + +func TestGrib2_ReadMessages(t *testing.T) { + // aws s3 cp --no-sign-request s3://noaa-gfs-bdp-pds/gfs.20240820/12/atmos/gfs.t12z.pgrb2.0p25.f044 pkg/testdata/gfs.t12z.pgrb2.0p25.f044 + const filename = "../testdata/gfs.t12z.pgrb2.0p25.f044" + + s, err := os.Stat(filename) + if errors.Is(err, os.ErrNotExist) { + t.Skipf("%s not exist", filename) + } + + t.Run(s.Name(), func(t *testing.T) { + f, err := os.Open(filename) + require.NoError(t, err) + + g := grib.NewGrib2(f) + var msgs []grib2.Message + + for i := 0; ; i++ { + msg, err := g.ReadMessage() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + require.NotNil(t, msg) + + msgs = append(msgs, msg) + } + + var eg errgroup.Group + for i, msg := range msgs { + eg.Go(func() error { + data, err := msg.ReadData() + require.NoError(t, err) + + t.Logf("count: %d, discipline: %d, category: %d, number: %d, forecast: %s, dataLen: %d", i+1, msg.GetDiscipline(), msg.GetParameterCategory(), msg.GetParameterNumber(), msg.GetForecastTime(time.UTC), len(data)) + + return nil + }) + } + + require.NoError(t, eg.Wait()) + }) +} diff --git a/pkg/grib2/message.go b/pkg/grib2/message.go index 791d112..5f9d7f0 100644 --- a/pkg/grib2/message.go +++ b/pkg/grib2/message.go @@ -1,6 +1,11 @@ package grib2 -import "time" +import ( + "bytes" + "time" + + "github.com/icza/bitio" +) type Message interface { GetDiscipline() int @@ -8,6 +13,8 @@ type Message interface { GetParameterNumber() int GetTimestamp(loc *time.Location) time.Time GetForecastTime(loc *time.Location) time.Time + GetLevel() int + ReadData() ([]float64, error) } type message struct { @@ -41,3 +48,13 @@ func (m message) GetTimestamp(loc *time.Location) time.Time { func (m message) GetForecastTime(loc *time.Location) time.Time { return m.GetTimestamp(loc).Add(m.sec4.GetForecastDuration()) } + +func (m *message) GetLevel() int { + return m.sec4.GetLevel() +} + +func (m *message) ReadData() ([]float64, error) { + tpl := m.sec5.GetDataRepresentationTemplate() + + return tpl.ReadAllData(bitio.NewReader(bytes.NewReader(m.sec7.Data))) +} diff --git a/pkg/grib2/pdt/template.go b/pkg/grib2/pdt/template.go index e606777..780a5b2 100644 --- a/pkg/grib2/pdt/template.go +++ b/pkg/grib2/pdt/template.go @@ -1,6 +1,7 @@ package pdt import ( + "bytes" "encoding/binary" "fmt" "io" @@ -11,6 +12,7 @@ type Template interface { GetParameterCategory() int GetParameterNumber() int GetForecastDuration() time.Duration + GetLevel() int } type MissingTemplate struct{} @@ -18,16 +20,32 @@ type MissingTemplate struct{} func (m MissingTemplate) GetParameterCategory() int { return -1 } func (m MissingTemplate) GetParameterNumber() int { return -1 } func (m MissingTemplate) GetForecastDuration() time.Duration { return 0 } +func (m MissingTemplate) GetLevel() int { return 0 } func ReadTemplate(r io.Reader, n uint16) (Template, error) { switch n { case 0: - var tpl template0 - if err := binary.Read(r, binary.BigEndian, &tpl); err != nil { + t0, err := readTemplate0(r) + if err != nil { return nil, err } - return tpl.Export(), nil + return t0.Export(), nil + + case 8: + t0, err := readTemplate0(r) + if err != nil { + return nil, err + } + + t8, err := readTemplate8(r, t0) + if err != nil { + return nil, err + } + + t8.template0 = t0 + + return t8.Export(), nil case 255: return &MissingTemplate{}, nil @@ -36,3 +54,28 @@ func ReadTemplate(r io.Reader, n uint16) (Template, error) { return nil, fmt.Errorf("unsupported product definition template: %d", n) } } + +func readTemplate0(r io.Reader) (*template0, error) { + var tpl template0 + if err := binary.Read(r, binary.BigEndian, &tpl); err != nil { + return nil, err + } + + return &tpl, nil +} + +func readTemplate8(r io.Reader, t0 *template0) (*template8, error) { + var tpl template8 + if err := binary.Read(r, binary.BigEndian, &tpl.template8fields); err != nil { + return nil, err + } + + tpl.template0 = t0 + + bs := tpl.template8fields.GetAdditionalTimeRangeSpecifications() + if _, err := io.CopyN(bytes.NewBuffer(bs), r, int64(len(bs))); err != nil { + return nil, err + } + + return &tpl, nil +} diff --git a/pkg/grib2/pdt/template0.go b/pkg/grib2/pdt/template0.go index 9685f07..17247f7 100644 --- a/pkg/grib2/pdt/template0.go +++ b/pkg/grib2/pdt/template0.go @@ -1,65 +1,66 @@ package pdt import ( + "math" "time" "github.com/scorix/grib-go/pkg/grib2/regulation" ) type template0 struct { - ParameterCategory uint8 - ParameterNumber uint8 - TypeOfGeneratingProcess uint8 - BackgroundProcess uint8 - GeneratingProcessIdentifier uint8 - HoursAfterDataCutoff uint16 - MinutesAfterDataCutoff uint8 - IndicatorOfUnitForForecastTime uint8 - ForecastTime uint32 - TypeOfFirstFixedSurface uint8 - ScaleFactorOfFirstFixedSurface uint8 - ScaledValueOfFirstFixedSurface uint32 - TypeOfSecondFixedSurface uint8 - ScaleFactorOfSecondFixedSurface uint8 - ScaledValueOfSecondFixedSurface uint32 + ParameterCategory uint8 // 10 + ParameterNumber uint8 // 11 + TypeOfGeneratingProcess uint8 // 12 + BackgroundProcess uint8 // 13 + AnalysisOrForecastGeneratingProcessIdentified uint8 // 14 + HoursAfterDataCutoff uint16 // 15-16 + MinutesAfterDataCutoff uint8 // 17 + IndicatorOfUnitForForecastTime uint8 // 18 + ForecastTime uint32 // 19-22 + TypeOfFirstFixedSurface uint8 // 23 + ScaleFactorOfFirstFixedSurface uint8 // 24 + ScaledValueOfFirstFixedSurface uint32 // 25-28 + TypeOfSecondFixedSurface uint8 // 29 + ScaleFactorOfSecondFixedSurface uint8 // 30 + ScaledValueOfSecondFixedSurface uint32 // 31-34 } func (t template0) Export() *Template0 { return &Template0{ - ParameterCategory: t.ParameterCategory, - ParameterNumber: t.ParameterNumber, - TypeOfGeneratingProcess: regulation.ToInt8(t.TypeOfGeneratingProcess), - BackgroundProcess: regulation.ToInt8(t.BackgroundProcess), - GeneratingProcessIdentifier: regulation.ToInt8(t.GeneratingProcessIdentifier), - HoursAfterDataCutoff: regulation.ToInt16(t.HoursAfterDataCutoff), - MinutesAfterDataCutoff: regulation.ToInt8(t.MinutesAfterDataCutoff), - IndicatorOfUnitForForecastTime: IndicatorOfUnitForTime(regulation.ToInt8(t.IndicatorOfUnitForForecastTime)), - ForecastTime: regulation.ToInt32(t.ForecastTime), - TypeOfFirstFixedSurface: regulation.ToInt8(t.TypeOfFirstFixedSurface), - ScaleFactorOfFirstFixedSurface: regulation.ToInt8(t.ScaleFactorOfFirstFixedSurface), - ScaledValueOfFirstFixedSurface: regulation.ToInt32(t.ScaledValueOfFirstFixedSurface), - TypeOfSecondFixedSurface: regulation.ToInt8(t.TypeOfSecondFixedSurface), - ScaleFactorOfSecondFixedSurface: regulation.ToInt8(t.ScaleFactorOfSecondFixedSurface), - ScaledValueOfSecondFixedSurface: regulation.ToInt32(t.ScaledValueOfSecondFixedSurface), + ParameterCategory: t.ParameterCategory, + ParameterNumber: t.ParameterNumber, + TypeOfGeneratingProcess: regulation.ToInt8(t.TypeOfGeneratingProcess), + BackgroundProcess: regulation.ToInt8(t.BackgroundProcess), + AnalysisOrForecastGeneratingProcessIdentified: regulation.ToInt8(t.AnalysisOrForecastGeneratingProcessIdentified), + HoursAfterDataCutoff: regulation.ToInt16(t.HoursAfterDataCutoff), + MinutesAfterDataCutoff: regulation.ToInt8(t.MinutesAfterDataCutoff), + IndicatorOfUnitForForecastTime: IndicatorOfUnitForTime(regulation.ToInt8(t.IndicatorOfUnitForForecastTime)), + ForecastTime: regulation.ToInt32(t.ForecastTime), + TypeOfFirstFixedSurface: regulation.ToInt8(t.TypeOfFirstFixedSurface), + ScaleFactorOfFirstFixedSurface: regulation.ToInt8(t.ScaleFactorOfFirstFixedSurface), + ScaledValueOfFirstFixedSurface: regulation.ToInt32(t.ScaledValueOfFirstFixedSurface), + TypeOfSecondFixedSurface: regulation.ToInt8(t.TypeOfSecondFixedSurface), + ScaleFactorOfSecondFixedSurface: regulation.ToInt8(t.ScaleFactorOfSecondFixedSurface), + ScaledValueOfSecondFixedSurface: regulation.ToInt32(t.ScaledValueOfSecondFixedSurface), } } type Template0 struct { - ParameterCategory uint8 - ParameterNumber uint8 - TypeOfGeneratingProcess int8 - BackgroundProcess int8 - GeneratingProcessIdentifier int8 - HoursAfterDataCutoff int16 - MinutesAfterDataCutoff int8 - IndicatorOfUnitForForecastTime IndicatorOfUnitForTime - ForecastTime int32 - TypeOfFirstFixedSurface int8 - ScaleFactorOfFirstFixedSurface int8 - ScaledValueOfFirstFixedSurface int32 - TypeOfSecondFixedSurface int8 - ScaleFactorOfSecondFixedSurface int8 - ScaledValueOfSecondFixedSurface int32 + ParameterCategory uint8 + ParameterNumber uint8 + TypeOfGeneratingProcess int8 + BackgroundProcess int8 + AnalysisOrForecastGeneratingProcessIdentified int8 + HoursAfterDataCutoff int16 + MinutesAfterDataCutoff int8 + IndicatorOfUnitForForecastTime IndicatorOfUnitForTime + ForecastTime int32 + TypeOfFirstFixedSurface int8 + ScaleFactorOfFirstFixedSurface int8 + ScaledValueOfFirstFixedSurface int32 + TypeOfSecondFixedSurface int8 + ScaleFactorOfSecondFixedSurface int8 + ScaledValueOfSecondFixedSurface int32 } func (t Template0) GetParameterCategory() int { return int(t.ParameterCategory) } @@ -67,3 +68,6 @@ func (t Template0) GetParameterNumber() int { return int(t.ParameterNumber) } func (t Template0) GetForecastDuration() time.Duration { return t.IndicatorOfUnitForForecastTime.AsDuration(int(t.ForecastTime)) } +func (t Template0) GetLevel() int { + return int(t.ScaledValueOfFirstFixedSurface) * int(math.Pow10(int(t.ScaleFactorOfFirstFixedSurface))) +} diff --git a/pkg/grib2/pdt/template8.go b/pkg/grib2/pdt/template8.go new file mode 100644 index 0000000..6c606a9 --- /dev/null +++ b/pkg/grib2/pdt/template8.go @@ -0,0 +1,60 @@ +package pdt + +import ( + "math" + "time" +) + +type template8 struct { + *template0 // 10-34 + template8fields +} + +type template8fields struct { + Year uint16 // 35-36 + Month uint8 // 37 + Day uint8 // 38 + Hour uint8 // 39 + Minute uint8 // 40 + Second uint8 // 41 + NumberOfTimeRanges uint8 // 42 n + TotalNumberOfDataValuesMissingInStatisticalProcess uint32 // 43-46 + // 47 - 58 Specification of the outermost (or only) time range over which statistical processing is done + StatisticalProcess uint8 // 47 + TypeOfTimeIncrement uint8 // 48 + IndicatorOfUnitOfTimeForTimeRange uint8 // 49 + LengthOfTimeRange uint32 // 50-53 + IndicatorOfUnitOfTimeForIncrement uint8 // 54 + TimeIncrement uint32 // 55-58 + // 59-70 As octets 47 to 58, next innermost step of processing +} + +func (fields template8fields) GetAdditionalTimeRangeSpecifications() []byte { + if fields.NumberOfTimeRanges == 0 { + return nil + } + + // 59 - nn These octets are included only if n>1, where nn = 46 + 12 x n + var nn = 46 + 12*fields.NumberOfTimeRanges + + return make([]byte, nn) +} + +func (t template8) Export() *Template8 { + return &Template8{ + Template0: t.template0.Export(), + } +} + +type Template8 struct { + *Template0 +} + +func (t Template8) GetParameterCategory() int { return int(t.ParameterCategory) } +func (t Template8) GetParameterNumber() int { return int(t.ParameterNumber) } +func (t Template8) GetForecastDuration() time.Duration { + return t.IndicatorOfUnitForForecastTime.AsDuration(int(t.ForecastTime)) +} +func (t Template8) GetLevel() int { + return int(t.ScaledValueOfFirstFixedSurface) * int(math.Pow10(int(t.ScaleFactorOfFirstFixedSurface))) +} diff --git a/pkg/grib2/section4.go b/pkg/grib2/section4.go index f2cb808..12b03d8 100644 --- a/pkg/grib2/section4.go +++ b/pkg/grib2/section4.go @@ -57,3 +57,7 @@ func (s *section4) GetParameterNumber() int { func (s *section4) GetForecastDuration() time.Duration { return s.GetProductDefinitionTemplate().GetForecastDuration() } + +func (s *section4) GetLevel() int { + return s.GetProductDefinitionTemplate().GetLevel() +}