diff --git a/etc/mqtt_source.yaml b/etc/mqtt_source.yaml index 3036ccf42e..b435b8420f 100644 --- a/etc/mqtt_source.yaml +++ b/etc/mqtt_source.yaml @@ -14,6 +14,7 @@ default: #connectionSelector: mqtt.mqtt_conf1 #kubeedgeVersion: #kubeedgeModelFile: "" + #useInt64ForWholeNumber: true # demo_conf: #Conf_key # qos: 0 diff --git a/internal/converter/converter.go b/internal/converter/converter.go index f9b16ccd23..e622459723 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -33,7 +33,7 @@ import ( func init() { modules.RegisterConverter(message.FormatJson, func(_ api.StreamContext, _ string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { - return json.NewFastJsonConverter(schema), nil + return json.NewFastJsonConverter(schema, props), nil }) modules.RegisterConverter(message.FormatXML, func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { return xml.NewXMLConverter(), nil diff --git a/internal/converter/json/convert_bench_test.go b/internal/converter/json/convert_bench_test.go index d88064bfdd..546f918f80 100644 --- a/internal/converter/json/convert_bench_test.go +++ b/internal/converter/json/convert_bench_test.go @@ -15,6 +15,7 @@ package json import ( + "encoding/json" "os" "testing" @@ -106,9 +107,38 @@ func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.Json if err != nil { b.Fatal(err) } - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) b.ResetTimer() for i := 0; i < b.N; i++ { f.Decode(ctx, payload) } } + +func BenchmarkNativeFloatParse(b *testing.B) { + m := make(map[string]interface{}) + data := `{"id":1.2}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + json.Unmarshal([]byte(data), &m) + } +} + +func BenchmarkFloatParse(b *testing.B) { + ctx := mockContext.NewMockContext("test", "test") + f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1.2}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Decode(ctx, []byte(data)) + } +} + +func BenchmarkIntParse(b *testing.B) { + ctx := mockContext.NewMockContext("test", "test") + f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Decode(ctx, []byte(data)) + } +} diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index 58e06643c0..22da770d3e 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -23,7 +23,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/valyala/fastjson" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/errorx" @@ -32,15 +31,28 @@ import ( type FastJsonConverter struct { sync.RWMutex schema map[string]*ast.JsonStreamField + FastJsonConverterConf } -func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter { +type FastJsonConverterConf struct { + UseInt64 bool `json:"useInt64ForWholeNumber"` +} + +func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter { f := &FastJsonConverter{ schema: schema, } + f.setupProps(props) return f } +func (f *FastJsonConverter) setupProps(props map[string]any) { + if props == nil { + return + } + cast.MapToStruct(props, &f.FastJsonConverterConf) +} + func (f *FastJsonConverter) ResetSchema(schema map[string]*ast.JsonStreamField) { f.Lock() defer f.Unlock() @@ -82,7 +94,7 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str case fastjson.TypeString: return vv.String(), nil case fastjson.TypeNumber: - return extractNumber(vv) + return f.extractNumber(vv) case fastjson.TypeTrue, fastjson.TypeFalse: return vv.Bool() } @@ -337,7 +349,7 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast. func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) { if field == nil { - return extractNumber(v) + return f.extractNumber(v) } switch { case field.Type == "float", field.Type == "datetime": @@ -413,6 +425,21 @@ func (f *FastJsonConverter) extractBooleanFromValue(name string, v *fastjson.Val return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, v.Type().String(), getType(field)) } +func (f *FastJsonConverter) extractNumber(v *fastjson.Value) (any, error) { + if f.UseInt64 && !isFloat64(v.String()) { + i64, err := v.Int64() + if err != nil { + return nil, err + } + return i64, nil + } + f64, err := v.Float64() + if err != nil { + return nil, err + } + return f64, nil +} + func getBooleanFromValue(value *fastjson.Value) (interface{}, error) { typ := value.Type() switch typ { @@ -448,21 +475,6 @@ func getType(t *ast.JsonStreamField) string { } } -func extractNumber(v *fastjson.Value) (any, error) { - if !isFloat64(v.String()) && !conf.IsTesting { - i64, err := v.Int64() - if err != nil { - return nil, err - } - return i64, nil - } - f64, err := v.Float64() - if err != nil { - return nil, err - } - return f64, nil -} - func isFloat64(v string) bool { return strings.Contains(v, ".") } diff --git a/internal/converter/json/converter_test.go b/internal/converter/json/converter_test.go index d78f06c6fd..5df6ea8dda 100644 --- a/internal/converter/json/converter_test.go +++ b/internal/converter/json/converter_test.go @@ -163,7 +163,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) { } ctx := mockContext.NewMockContext("test", "op1") for _, tc := range testcases { - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, tc.payload) require.NoError(t, err) require.Equal(t, v, tc.require) @@ -174,7 +174,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -334,7 +334,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) { } ctx := mockContext.NewMockContext("test", "op1") for _, tc := range testcases { - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) _, err := f.Decode(ctx, tc.payload) require.Error(t, err) require.Equal(t, err.Error(), tc.err.Error()) @@ -345,7 +345,7 @@ func TestFastJsonEncode(t *testing.T) { a := make(map[string]int) a["a"] = 1 ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(nil) + f := NewFastJsonConverter(nil, nil) v, err := f.Encode(ctx, a) require.NoError(t, err) require.Equal(t, v, []byte(`{"a":1}`)) @@ -378,7 +378,7 @@ func TestArrayWithArray(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) v, err := f.Decode(ctx, payload) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -559,7 +559,7 @@ func TestTypeNull(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -569,7 +569,7 @@ func TestTypeNull(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -586,7 +586,7 @@ func TestConvertBytea(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) v, err := f.Decode(ctx, []byte(payload)) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -602,7 +602,7 @@ func TestConvertBytea(t *testing.T) { }, }, } - f = NewFastJsonConverter(schema) + f = NewFastJsonConverter(schema, nil) v, err = f.Decode(ctx, []byte(payload)) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -614,7 +614,7 @@ func TestSchemaless(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "a": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) testcases := []struct { data map[string]interface{} expect map[string]interface{} @@ -665,7 +665,7 @@ func TestIssue(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "results": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) data := `{ "results": [ { @@ -705,7 +705,7 @@ func TestIssue(t *testing.T) { schmema2 := map[string]*ast.JsonStreamField{ "others": nil, } - f2 := NewFastJsonConverter(schmema2) + f2 := NewFastJsonConverter(schmema2, nil) m, err = f2.Decode(context.Background(), []byte(data)) require.NoError(t, err) require.Len(t, m, 0) @@ -755,7 +755,7 @@ func TestDecodeField(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(nil) + f := NewFastJsonConverter(nil, nil) for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { field, err := f.DecodeField(ctx, tc.payload, "id") @@ -774,7 +774,7 @@ func TestIssue3441(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "id": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) data := `{"id":1795292668348461056}` ctx := mockContext.NewMockContext("test", "op1") m, err := f.Decode(ctx, []byte(data))