diff --git a/etc/mqtt_source.yaml b/etc/mqtt_source.yaml index 3036ccf42e..b435b8420f 100644 --- a/etc/mqtt_source.yaml +++ b/etc/mqtt_source.yaml @@ -14,6 +14,7 @@ default: #connectionSelector: mqtt.mqtt_conf1 #kubeedgeVersion: #kubeedgeModelFile: "" + #useInt64ForWholeNumber: true # demo_conf: #Conf_key # qos: 0 diff --git a/internal/converter/converter.go b/internal/converter/converter.go index f9b16ccd23..e622459723 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -33,7 +33,7 @@ import ( func init() { modules.RegisterConverter(message.FormatJson, func(_ api.StreamContext, _ string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { - return json.NewFastJsonConverter(schema), nil + return json.NewFastJsonConverter(schema, props), nil }) modules.RegisterConverter(message.FormatXML, func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { return xml.NewXMLConverter(), nil diff --git a/internal/converter/json/convert_bench_test.go b/internal/converter/json/convert_bench_test.go index d88064bfdd..546f918f80 100644 --- a/internal/converter/json/convert_bench_test.go +++ b/internal/converter/json/convert_bench_test.go @@ -15,6 +15,7 @@ package json import ( + "encoding/json" "os" "testing" @@ -106,9 +107,38 @@ func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.Json if err != nil { b.Fatal(err) } - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) b.ResetTimer() for i := 0; i < b.N; i++ { f.Decode(ctx, payload) } } + +func BenchmarkNativeFloatParse(b *testing.B) { + m := make(map[string]interface{}) + data := `{"id":1.2}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + json.Unmarshal([]byte(data), &m) + } +} + +func BenchmarkFloatParse(b *testing.B) { + ctx := mockContext.NewMockContext("test", "test") + f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1.2}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Decode(ctx, []byte(data)) + } +} + +func BenchmarkIntParse(b *testing.B) { + ctx := mockContext.NewMockContext("test", "test") + f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Decode(ctx, []byte(data)) + } +} diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index c016e42007..22da770d3e 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -17,6 +17,7 @@ package json import ( "encoding/json" "fmt" + "strings" "sync" "github.com/lf-edge/ekuiper/contract/v2/api" @@ -30,15 +31,28 @@ import ( type FastJsonConverter struct { sync.RWMutex schema map[string]*ast.JsonStreamField + FastJsonConverterConf } -func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter { +type FastJsonConverterConf struct { + UseInt64 bool `json:"useInt64ForWholeNumber"` +} + +func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter { f := &FastJsonConverter{ schema: schema, } + f.setupProps(props) return f } +func (f *FastJsonConverter) setupProps(props map[string]any) { + if props == nil { + return + } + cast.MapToStruct(props, &f.FastJsonConverterConf) +} + func (f *FastJsonConverter) ResetSchema(schema map[string]*ast.JsonStreamField) { f.Lock() defer f.Unlock() @@ -57,11 +71,6 @@ func (f *FastJsonConverter) Decode(ctx api.StreamContext, b []byte) (m any, err }() f.RLock() defer f.RUnlock() - if f.schema == nil { - var r any - err = json.Unmarshal(b, &r) - return r, err - } return f.decodeWithSchema(b, f.schema) } @@ -85,7 +94,7 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str case fastjson.TypeString: return vv.String(), nil case fastjson.TypeNumber: - return vv.Float64() + return f.extractNumber(vv) case fastjson.TypeTrue, fastjson.TypeFalse: return vv.Bool() } @@ -340,11 +349,7 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast. func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) { if field == nil { - f64, err := v.Float64() - if err != nil { - return nil, err - } - return f64, nil + return f.extractNumber(v) } switch { case field.Type == "float", field.Type == "datetime": @@ -420,6 +425,21 @@ func (f *FastJsonConverter) extractBooleanFromValue(name string, v *fastjson.Val return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, v.Type().String(), getType(field)) } +func (f *FastJsonConverter) extractNumber(v *fastjson.Value) (any, error) { + if f.UseInt64 && !isFloat64(v.String()) { + i64, err := v.Int64() + if err != nil { + return nil, err + } + return i64, nil + } + f64, err := v.Float64() + if err != nil { + return nil, err + } + return f64, nil +} + func getBooleanFromValue(value *fastjson.Value) (interface{}, error) { typ := value.Type() switch typ { @@ -454,3 +474,7 @@ func getType(t *ast.JsonStreamField) string { return t.Type } } + +func isFloat64(v string) bool { + return strings.Contains(v, ".") +} diff --git a/internal/converter/json/converter_test.go b/internal/converter/json/converter_test.go index 5f0bcec544..f962a96f0f 100644 --- a/internal/converter/json/converter_test.go +++ b/internal/converter/json/converter_test.go @@ -162,7 +162,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) { } ctx := mockContext.NewMockContext("test", "op1") for _, tc := range testcases { - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, tc.payload) require.NoError(t, err) require.Equal(t, v, tc.require) @@ -173,7 +173,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -333,7 +333,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) { } ctx := mockContext.NewMockContext("test", "op1") for _, tc := range testcases { - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) _, err := f.Decode(ctx, tc.payload) require.Error(t, err) require.Equal(t, err.Error(), tc.err.Error()) @@ -344,7 +344,7 @@ func TestFastJsonEncode(t *testing.T) { a := make(map[string]int) a["a"] = 1 ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(nil) + f := NewFastJsonConverter(nil, nil) v, err := f.Encode(ctx, a) require.NoError(t, err) require.Equal(t, v, []byte(`{"a":1}`)) @@ -377,7 +377,7 @@ func TestArrayWithArray(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) v, err := f.Decode(ctx, payload) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -558,7 +558,7 @@ func TestTypeNull(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -568,7 +568,7 @@ func TestTypeNull(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -585,7 +585,7 @@ func TestConvertBytea(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) v, err := f.Decode(ctx, []byte(payload)) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -601,7 +601,7 @@ func TestConvertBytea(t *testing.T) { }, }, } - f = NewFastJsonConverter(schema) + f = NewFastJsonConverter(schema, nil) v, err = f.Decode(ctx, []byte(payload)) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -613,7 +613,7 @@ func TestSchemaless(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "a": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) testcases := []struct { data map[string]interface{} expect map[string]interface{} @@ -664,7 +664,7 @@ func TestIssue(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "results": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) data := `{ "results": [ { @@ -704,7 +704,7 @@ func TestIssue(t *testing.T) { schmema2 := map[string]*ast.JsonStreamField{ "others": nil, } - f2 := NewFastJsonConverter(schmema2) + f2 := NewFastJsonConverter(schmema2, nil) m, err = f2.Decode(context.Background(), []byte(data)) require.NoError(t, err) require.Len(t, m, 0) @@ -754,7 +754,7 @@ func TestDecodeField(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(nil) + f := NewFastJsonConverter(nil, nil) for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { field, err := f.DecodeField(ctx, tc.payload, "id") @@ -767,3 +767,20 @@ func TestDecodeField(t *testing.T) { }) } } + +func TestIssue3441(t *testing.T) { + originSchema := map[string]*ast.JsonStreamField{ + "id": nil, + } + f := NewFastJsonConverter(originSchema, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1795292668348461056}` + ctx := mockContext.NewMockContext("test", "op1") + m, err := f.Decode(ctx, []byte(data)) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"id": int64(1795292668348461056)}, m) + + data = `{"id":17952926683484.44}` + m, err = f.Decode(ctx, []byte(data)) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"id": 17952926683484.44}, m) +} diff --git a/internal/topo/node/decode_op_test.go b/internal/topo/node/decode_op_test.go index 497cf4f13e..d63bed2fd8 100644 --- a/internal/topo/node/decode_op_test.go +++ b/internal/topo/node/decode_op_test.go @@ -69,9 +69,9 @@ func TestJSON(t *testing.T) { &xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": 3.0, "b": 4.0, "sourceConf": "hello"}, Timestamp: time.UnixMilli(111), Metadata: map[string]any{"topic": "demo", "qos": 1}}, }, {errors.New("go through error")}, - {errors.New("invalid character ':' after top-level value")}, - {errors.New("only map[string]any inside a list is supported but got: hello")}, - {errors.New("unsupported decode result: hello")}, + {errors.New(`unexpected tail: ":1,\"b\":2},{\"a\":3,\"b\":4,\"sourceConf\":\"hello\"}]"`)}, + {errors.New(`value doesn't contain object; it contains string`)}, + {errors.New(`only map[string]interface{} and []map[string]interface{} is supported`)}, {errors.New("unsupported data received: invalid")}, } timex.Add(2 * time.Second) diff --git a/internal/topo/node/window_inc_agg_op_test.go b/internal/topo/node/window_inc_agg_op_test.go index 13a2382121..d042042972 100644 --- a/internal/topo/node/window_inc_agg_op_test.go +++ b/internal/topo/node/window_inc_agg_op_test.go @@ -298,8 +298,9 @@ func TestIncAggTumblingWindow(t *testing.T) { errCh := make(chan error, 10) ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel() op.Exec(ctx, errCh) - time.Sleep(10 * time.Millisecond) + waitExecute() input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}} + waitExecute() timex.Add(1100 * time.Millisecond) got := <-output wt, ok := got.(*xsql.WindowTuples)