Skip to content

Commit

Permalink
fix: converter lose precision (#3445)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Dec 19, 2024
1 parent 0bc2ae8 commit e952b40
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 31 deletions.
1 change: 1 addition & 0 deletions etc/mqtt_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ default:
#connectionSelector: mqtt.mqtt_conf1
#kubeedgeVersion:
#kubeedgeModelFile: ""
#useInt64ForWholeNumber: true

# demo_conf: #Conf_key
# qos: 0
Expand Down
2 changes: 1 addition & 1 deletion internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func init() {
modules.RegisterConverter(message.FormatJson, func(_ api.StreamContext, _ string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return json.NewFastJsonConverter(schema), nil
return json.NewFastJsonConverter(schema, props), nil
})
modules.RegisterConverter(message.FormatXML, func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return xml.NewXMLConverter(), nil
Expand Down
32 changes: 31 additions & 1 deletion internal/converter/json/convert_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package json

import (
"encoding/json"
"os"
"testing"

Expand Down Expand Up @@ -106,9 +107,38 @@ func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.Json
if err != nil {
b.Fatal(err)
}
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, payload)
}
}

func BenchmarkNativeFloatParse(b *testing.B) {
m := make(map[string]interface{})
data := `{"id":1.2}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
json.Unmarshal([]byte(data), &m)
}
}

func BenchmarkFloatParse(b *testing.B) {
ctx := mockContext.NewMockContext("test", "test")
f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true})
data := `{"id":1.2}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, []byte(data))
}
}

func BenchmarkIntParse(b *testing.B) {
ctx := mockContext.NewMockContext("test", "test")
f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true})
data := `{"id":1}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, []byte(data))
}
}
48 changes: 36 additions & 12 deletions internal/converter/json/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package json
import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/lf-edge/ekuiper/contract/v2/api"
Expand All @@ -30,15 +31,28 @@ import (
type FastJsonConverter struct {
sync.RWMutex
schema map[string]*ast.JsonStreamField
FastJsonConverterConf
}

func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter {
type FastJsonConverterConf struct {
UseInt64 bool `json:"useInt64ForWholeNumber"`
}

func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter {
f := &FastJsonConverter{
schema: schema,
}
f.setupProps(props)
return f
}

func (f *FastJsonConverter) setupProps(props map[string]any) {
if props == nil {
return
}
cast.MapToStruct(props, &f.FastJsonConverterConf)
}

func (f *FastJsonConverter) ResetSchema(schema map[string]*ast.JsonStreamField) {
f.Lock()
defer f.Unlock()
Expand All @@ -57,11 +71,6 @@ func (f *FastJsonConverter) Decode(ctx api.StreamContext, b []byte) (m any, err
}()
f.RLock()
defer f.RUnlock()
if f.schema == nil {
var r any
err = json.Unmarshal(b, &r)
return r, err
}
return f.decodeWithSchema(b, f.schema)
}

Expand All @@ -85,7 +94,7 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str
case fastjson.TypeString:
return vv.String(), nil
case fastjson.TypeNumber:
return vv.Float64()
return f.extractNumber(vv)
case fastjson.TypeTrue, fastjson.TypeFalse:
return vv.Bool()
}
Expand Down Expand Up @@ -340,11 +349,7 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast.

func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) {
if field == nil {
f64, err := v.Float64()
if err != nil {
return nil, err
}
return f64, nil
return f.extractNumber(v)
}
switch {
case field.Type == "float", field.Type == "datetime":
Expand Down Expand Up @@ -420,6 +425,21 @@ func (f *FastJsonConverter) extractBooleanFromValue(name string, v *fastjson.Val
return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, v.Type().String(), getType(field))
}

func (f *FastJsonConverter) extractNumber(v *fastjson.Value) (any, error) {
if f.UseInt64 && !isFloat64(v.String()) {
i64, err := v.Int64()
if err != nil {
return nil, err
}
return i64, nil
}
f64, err := v.Float64()
if err != nil {
return nil, err
}
return f64, nil
}

func getBooleanFromValue(value *fastjson.Value) (interface{}, error) {
typ := value.Type()
switch typ {
Expand Down Expand Up @@ -454,3 +474,7 @@ func getType(t *ast.JsonStreamField) string {
return t.Type
}
}

func isFloat64(v string) bool {
return strings.Contains(v, ".")
}
43 changes: 30 additions & 13 deletions internal/converter/json/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) {
}
ctx := mockContext.NewMockContext("test", "op1")
for _, tc := range testcases {
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, tc.payload)
require.NoError(t, err)
require.Equal(t, v, tc.require)
Expand All @@ -173,7 +173,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) {
}
ctx := mockContext.NewMockContext("test", "op1")
for _, tc := range testcases {
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
_, err := f.Decode(ctx, tc.payload)
require.Error(t, err)
require.Equal(t, err.Error(), tc.err.Error())
Expand All @@ -344,7 +344,7 @@ func TestFastJsonEncode(t *testing.T) {
a := make(map[string]int)
a["a"] = 1
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(nil)
f := NewFastJsonConverter(nil, nil)
v, err := f.Encode(ctx, a)
require.NoError(t, err)
require.Equal(t, v, []byte(`{"a":1}`))
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestArrayWithArray(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
v, err := f.Decode(ctx, payload)
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestTypeNull(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand All @@ -568,7 +568,7 @@ func TestTypeNull(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand All @@ -585,7 +585,7 @@ func TestConvertBytea(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
v, err := f.Decode(ctx, []byte(payload))
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand All @@ -601,7 +601,7 @@ func TestConvertBytea(t *testing.T) {
},
},
}
f = NewFastJsonConverter(schema)
f = NewFastJsonConverter(schema, nil)
v, err = f.Decode(ctx, []byte(payload))
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand All @@ -613,7 +613,7 @@ func TestSchemaless(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"a": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
testcases := []struct {
data map[string]interface{}
expect map[string]interface{}
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestIssue(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"results": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
data := `{
"results": [
{
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestIssue(t *testing.T) {
schmema2 := map[string]*ast.JsonStreamField{
"others": nil,
}
f2 := NewFastJsonConverter(schmema2)
f2 := NewFastJsonConverter(schmema2, nil)
m, err = f2.Decode(context.Background(), []byte(data))
require.NoError(t, err)
require.Len(t, m, 0)
Expand Down Expand Up @@ -754,7 +754,7 @@ func TestDecodeField(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(nil)
f := NewFastJsonConverter(nil, nil)
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
field, err := f.DecodeField(ctx, tc.payload, "id")
Expand All @@ -767,3 +767,20 @@ func TestDecodeField(t *testing.T) {
})
}
}

func TestIssue3441(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"id": nil,
}
f := NewFastJsonConverter(originSchema, map[string]any{"useInt64ForWholeNumber": true})
data := `{"id":1795292668348461056}`
ctx := mockContext.NewMockContext("test", "op1")
m, err := f.Decode(ctx, []byte(data))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"id": int64(1795292668348461056)}, m)

data = `{"id":17952926683484.44}`
m, err = f.Decode(ctx, []byte(data))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"id": 17952926683484.44}, m)
}
6 changes: 3 additions & 3 deletions internal/topo/node/decode_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func TestJSON(t *testing.T) {
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": 3.0, "b": 4.0, "sourceConf": "hello"}, Timestamp: time.UnixMilli(111), Metadata: map[string]any{"topic": "demo", "qos": 1}},
},
{errors.New("go through error")},
{errors.New("invalid character ':' after top-level value")},
{errors.New("only map[string]any inside a list is supported but got: hello")},
{errors.New("unsupported decode result: hello")},
{errors.New(`unexpected tail: ":1,\"b\":2},{\"a\":3,\"b\":4,\"sourceConf\":\"hello\"}]"`)},
{errors.New(`value doesn't contain object; it contains string`)},
{errors.New(`only map[string]interface{} and []map[string]interface{} is supported`)},
{errors.New("unsupported data received: invalid")},
}
timex.Add(2 * time.Second)
Expand Down
3 changes: 2 additions & 1 deletion internal/topo/node/window_inc_agg_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ func TestIncAggTumblingWindow(t *testing.T) {
errCh := make(chan error, 10)
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()
op.Exec(ctx, errCh)
time.Sleep(10 * time.Millisecond)
waitExecute()
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}}
waitExecute()
timex.Add(1100 * time.Millisecond)
got := <-output
wt, ok := got.(*xsql.WindowTuples)
Expand Down

0 comments on commit e952b40

Please sign in to comment.