Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Dec 19, 2024
1 parent aef7ac9 commit 8409343
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 35 deletions.
1 change: 1 addition & 0 deletions etc/mqtt_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ default:
#connectionSelector: mqtt.mqtt_conf1
#kubeedgeVersion:
#kubeedgeModelFile: ""
#useInt64ForWholeNumber: true

# demo_conf: #Conf_key
# qos: 0
Expand Down
2 changes: 1 addition & 1 deletion internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func init() {
modules.RegisterConverter(message.FormatJson, func(_ api.StreamContext, _ string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return json.NewFastJsonConverter(schema), nil
return json.NewFastJsonConverter(schema, props), nil
})
modules.RegisterConverter(message.FormatXML, func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return xml.NewXMLConverter(), nil
Expand Down
32 changes: 31 additions & 1 deletion internal/converter/json/convert_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package json

import (
"encoding/json"
"os"
"testing"

Expand Down Expand Up @@ -106,9 +107,38 @@ func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.Json
if err != nil {
b.Fatal(err)
}
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, payload)
}
}

func BenchmarkNativeFloatParse(b *testing.B) {
m := make(map[string]interface{})
data := `{"id":1.2}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
json.Unmarshal([]byte(data), &m)
}
}

func BenchmarkFloatParse(b *testing.B) {
ctx := mockContext.NewMockContext("test", "test")
f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true})
data := `{"id":1.2}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, []byte(data))
}
}

func BenchmarkIntParse(b *testing.B) {
ctx := mockContext.NewMockContext("test", "test")
f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true})
data := `{"id":1}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, []byte(data))
}
}
50 changes: 31 additions & 19 deletions internal/converter/json/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/valyala/fastjson"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
Expand All @@ -32,15 +31,28 @@ import (
type FastJsonConverter struct {
sync.RWMutex
schema map[string]*ast.JsonStreamField
FastJsonConverterConf
}

func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter {
type FastJsonConverterConf struct {
UseInt64 bool `json:"useInt64ForWholeNumber"`
}

func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter {
f := &FastJsonConverter{
schema: schema,
}
f.setupProps(props)
return f
}

func (f *FastJsonConverter) setupProps(props map[string]any) {
if props == nil {
return
}
cast.MapToStruct(props, &f.FastJsonConverterConf)
}

func (f *FastJsonConverter) ResetSchema(schema map[string]*ast.JsonStreamField) {
f.Lock()
defer f.Unlock()
Expand Down Expand Up @@ -82,7 +94,7 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str
case fastjson.TypeString:
return vv.String(), nil
case fastjson.TypeNumber:
return extractNumber(vv)
return f.extractNumber(vv)
case fastjson.TypeTrue, fastjson.TypeFalse:
return vv.Bool()
}
Expand Down Expand Up @@ -337,7 +349,7 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast.

func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) {
if field == nil {
return extractNumber(v)
return f.extractNumber(v)
}
switch {
case field.Type == "float", field.Type == "datetime":
Expand Down Expand Up @@ -413,6 +425,21 @@ func (f *FastJsonConverter) extractBooleanFromValue(name string, v *fastjson.Val
return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, v.Type().String(), getType(field))
}

func (f *FastJsonConverter) extractNumber(v *fastjson.Value) (any, error) {
if f.UseInt64 && !isFloat64(v.String()) {
i64, err := v.Int64()
if err != nil {
return nil, err
}
return i64, nil
}
f64, err := v.Float64()
if err != nil {
return nil, err
}
return f64, nil
}

func getBooleanFromValue(value *fastjson.Value) (interface{}, error) {
typ := value.Type()
switch typ {
Expand Down Expand Up @@ -448,21 +475,6 @@ func getType(t *ast.JsonStreamField) string {
}
}

func extractNumber(v *fastjson.Value) (any, error) {
if !isFloat64(v.String()) && !conf.IsTesting {
i64, err := v.Int64()
if err != nil {
return nil, err
}
return i64, nil
}
f64, err := v.Float64()
if err != nil {
return nil, err
}
return f64, nil
}

func isFloat64(v string) bool {
return strings.Contains(v, ".")
}
28 changes: 14 additions & 14 deletions internal/converter/json/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) {
}
ctx := mockContext.NewMockContext("test", "op1")
for _, tc := range testcases {
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, tc.payload)
require.NoError(t, err)
require.Equal(t, v, tc.require)
Expand All @@ -174,7 +174,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) {
}
ctx := mockContext.NewMockContext("test", "op1")
for _, tc := range testcases {
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
_, err := f.Decode(ctx, tc.payload)
require.Error(t, err)
require.Equal(t, err.Error(), tc.err.Error())
Expand All @@ -345,7 +345,7 @@ func TestFastJsonEncode(t *testing.T) {
a := make(map[string]int)
a["a"] = 1
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(nil)
f := NewFastJsonConverter(nil, nil)
v, err := f.Encode(ctx, a)
require.NoError(t, err)
require.Equal(t, v, []byte(`{"a":1}`))
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestArrayWithArray(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
v, err := f.Decode(ctx, payload)
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand Down Expand Up @@ -559,7 +559,7 @@ func TestTypeNull(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand All @@ -569,7 +569,7 @@ func TestTypeNull(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand All @@ -586,7 +586,7 @@ func TestConvertBytea(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
v, err := f.Decode(ctx, []byte(payload))
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand All @@ -602,7 +602,7 @@ func TestConvertBytea(t *testing.T) {
},
},
}
f = NewFastJsonConverter(schema)
f = NewFastJsonConverter(schema, nil)
v, err = f.Decode(ctx, []byte(payload))
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand All @@ -614,7 +614,7 @@ func TestSchemaless(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"a": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
testcases := []struct {
data map[string]interface{}
expect map[string]interface{}
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestIssue(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"results": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
data := `{
"results": [
{
Expand Down Expand Up @@ -705,7 +705,7 @@ func TestIssue(t *testing.T) {
schmema2 := map[string]*ast.JsonStreamField{
"others": nil,
}
f2 := NewFastJsonConverter(schmema2)
f2 := NewFastJsonConverter(schmema2, nil)
m, err = f2.Decode(context.Background(), []byte(data))
require.NoError(t, err)
require.Len(t, m, 0)
Expand Down Expand Up @@ -755,7 +755,7 @@ func TestDecodeField(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(nil)
f := NewFastJsonConverter(nil, nil)
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
field, err := f.DecodeField(ctx, tc.payload, "id")
Expand All @@ -774,7 +774,7 @@ func TestIssue3441(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"id": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
data := `{"id":1795292668348461056}`
ctx := mockContext.NewMockContext("test", "op1")
m, err := f.Decode(ctx, []byte(data))
Expand Down

0 comments on commit 8409343

Please sign in to comment.