From 572f26d527df3a727c8af8cce2c6fb197a62e676 Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 11:59:40 +1300 Subject: [PATCH 1/7] CBOR Record ID Unmarshal to `any` --- pkg/models/record_id.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/models/record_id.go b/pkg/models/record_id.go index cb466e5..c1f69d8 100644 --- a/pkg/models/record_id.go +++ b/pkg/models/record_id.go @@ -50,7 +50,7 @@ func (r *RecordID) UnmarshalCBOR(data []byte) error { } r.Table = temp[0].(string) - r.ID = temp[1].(string) + r.ID = temp[1] return nil } From d53d644087f4bef1343907736301ae7df08014a9 Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 14:37:48 +1300 Subject: [PATCH 2/7] Implement error handling --- db.go | 3 +++ pkg/connection/connection.go | 30 ++++++++++++++++++++++++++++++ pkg/connection/ws.go | 18 ++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/db.go b/db.go index 8fa75e2..7c6f5f8 100644 --- a/db.go +++ b/db.go @@ -197,6 +197,9 @@ func Query[TResult any](db *DB, sql string, vars map[string]interface{}) (*[]Que if err := db.con.Send(&res, "query", sql, vars); err != nil { return nil, err } + if res.Error != nil { + return nil, res.Error + } return res.Result, nil } diff --git a/pkg/connection/connection.go b/pkg/connection/connection.go index 29860bb..1a65928 100644 --- a/pkg/connection/connection.go +++ b/pkg/connection/connection.go @@ -42,6 +42,9 @@ type BaseConnection struct { responseChannels map[string]chan []byte responseChannelsLock sync.RWMutex + errorChannels map[string]chan error + errorChannelsLock sync.RWMutex + notificationChannels map[string]chan Notification notificationChannelsLock sync.RWMutex } @@ -60,6 +63,20 @@ func (bc *BaseConnection) createResponseChannel(id string) (chan []byte, error) return ch, nil } +func (bc *BaseConnection) createErrorChannel(id string) (chan error, error) { + bc.errorChannelsLock.Lock() + defer bc.errorChannelsLock.Unlock() + + if _, ok := bc.errorChannels[id]; ok { + return nil, fmt.Errorf("%w: %v", constants.ErrIDInUse, id) + } + + ch := make(chan error) + bc.errorChannels[id] = ch + + return ch, nil +} + func (bc *BaseConnection) createNotificationChannel(liveQueryID string) (chan Notification, error) { bc.notificationChannelsLock.Lock() defer bc.notificationChannelsLock.Unlock() @@ -80,6 +97,12 @@ func (bc *BaseConnection) removeResponseChannel(id string) { delete(bc.responseChannels, id) } +func (bc *BaseConnection) removeErrorChannel(id string) { + bc.errorChannelsLock.Lock() + defer bc.errorChannelsLock.Unlock() + delete(bc.errorChannels, id) +} + func (bc *BaseConnection) getResponseChannel(id string) (chan []byte, bool) { bc.responseChannelsLock.RLock() defer bc.responseChannelsLock.RUnlock() @@ -87,6 +110,13 @@ func (bc *BaseConnection) getResponseChannel(id string) (chan []byte, bool) { return ch, ok } +func (bc *BaseConnection) getErrorChannel(id string) (chan error, bool) { + bc.errorChannelsLock.RLock() + defer bc.errorChannelsLock.RUnlock() + ch, ok := bc.errorChannels[id] + return ch, ok +} + func (bc *BaseConnection) getLiveChannel(id string) (chan Notification, bool) { bc.notificationChannelsLock.RLock() defer bc.notificationChannelsLock.RUnlock() diff --git a/pkg/connection/ws.go b/pkg/connection/ws.go index d0f975f..bc0fc59 100644 --- a/pkg/connection/ws.go +++ b/pkg/connection/ws.go @@ -159,7 +159,12 @@ func (ws *WebSocketConnection) Send(dest interface{}, method string, params ...i if err != nil { return err } + errorChan, err := ws.createErrorChannel(id) + if err != nil { + return err + } defer ws.removeResponseChannel(id) + defer ws.removeErrorChannel(id) if err := ws.write(request); err != nil { return err @@ -177,6 +182,11 @@ func (ws *WebSocketConnection) Send(dest interface{}, method string, params ...i return ws.unmarshaler.Unmarshal(resBytes, dest) } return nil + case resErr, open := <-errorChan: + if !open { + return errors.New("error channel closed") + } + return resErr } } @@ -234,6 +244,14 @@ func (ws *WebSocketConnection) handleResponse(res []byte) { if rpcRes.Error != nil { err := fmt.Errorf("rpc request err %w", rpcRes.Error) ws.logger.Error(err.Error()) + errChan, ok := ws.getErrorChannel(fmt.Sprintf("%v", rpcRes.ID)) + if !ok { + err := fmt.Errorf("unavailable ErrorChannel %+v", rpcRes.ID) + ws.logger.Error(err.Error()) + return + } + defer close(errChan) + errChan <- rpcRes.Error return } From bf30773cf77e53f79fe2b2ac7a494504937e7305 Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 15:30:28 +1300 Subject: [PATCH 3/7] Init error channel --- pkg/connection/ws.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/connection/ws.go b/pkg/connection/ws.go index bc0fc59..ae9dac7 100644 --- a/pkg/connection/ws.go +++ b/pkg/connection/ws.go @@ -44,6 +44,7 @@ func NewWebSocketConnection(p NewConnectionParams) *WebSocketConnection { unmarshaler: p.Unmarshaler, responseChannels: make(map[string]chan []byte), + errorChannels: make(map[string]chan error), notificationChannels: make(map[string]chan Notification), }, From 8ac837bb79e2115c774bba0110dbe841760608b6 Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 15:40:20 +1300 Subject: [PATCH 4/7] Add RPC error tests --- db_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/db_test.go b/db_test.go index df64152..a3cb041 100644 --- a/db_test.go +++ b/db_test.go @@ -464,3 +464,16 @@ func (s *SurrealDBTestSuite) TestQueryRaw() { fmt.Println(created) fmt.Println(selected) } + +func (s *SurrealDBTestSuite) TestRPCError() { + s.Run("Test valid query", func() { + _, err := surrealdb.Query[[]testUser](s.db, "SELECT * FROM users", map[string]interface{}{}) + s.Require().NoError(err) + }) + + s.Run("Test invalid query", func() { + _, err := surrealdb.Query[[]testUser](s.db, "SELEC * FROM users", map[string]interface{}{}) + s.Require().Error(err) + }) + +} From 42831759ab388abfa49f93f348c40085050abe24 Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 15:41:50 +1300 Subject: [PATCH 5/7] removed junk --- db.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/db.go b/db.go index 7c6f5f8..8fa75e2 100644 --- a/db.go +++ b/db.go @@ -197,9 +197,6 @@ func Query[TResult any](db *DB, sql string, vars map[string]interface{}) (*[]Que if err := db.con.Send(&res, "query", sql, vars); err != nil { return nil, err } - if res.Error != nil { - return nil, res.Error - } return res.Result, nil } From 7c3f0095eb59e0c87deb48288bf23025ec17d26b Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 15:50:08 +1300 Subject: [PATCH 6/7] lint --- db_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/db_test.go b/db_test.go index a3cb041..37f1b76 100644 --- a/db_test.go +++ b/db_test.go @@ -475,5 +475,4 @@ func (s *SurrealDBTestSuite) TestRPCError() { _, err := surrealdb.Query[[]testUser](s.db, "SELEC * FROM users", map[string]interface{}{}) s.Require().Error(err) }) - } From b53a45ed1d872fefe3c5a3201271b7c54b166581 Mon Sep 17 00:00:00 2001 From: TheRangiCrew Date: Tue, 29 Oct 2024 16:13:42 +1300 Subject: [PATCH 7/7] lint --- pkg/connection/ws.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/connection/ws.go b/pkg/connection/ws.go index ae9dac7..df4b474 100644 --- a/pkg/connection/ws.go +++ b/pkg/connection/ws.go @@ -245,14 +245,17 @@ func (ws *WebSocketConnection) handleResponse(res []byte) { if rpcRes.Error != nil { err := fmt.Errorf("rpc request err %w", rpcRes.Error) ws.logger.Error(err.Error()) + errChan, ok := ws.getErrorChannel(fmt.Sprintf("%v", rpcRes.ID)) if !ok { err := fmt.Errorf("unavailable ErrorChannel %+v", rpcRes.ID) ws.logger.Error(err.Error()) return } + defer close(errChan) errChan <- rpcRes.Error + return }