From b98c8ee1f24edfe2798f2138271e5de819fd576a Mon Sep 17 00:00:00 2001 From: KubeMQ Date: Wed, 26 Feb 2020 19:37:30 +0200 Subject: [PATCH] fix for response tags in query and command --- README.md | 2 +- command.go | 3 +-- event.go | 2 +- examples/queue/batch/main.go | 19 ++++++++----------- examples/queue/stream_resend/main.go | 4 ++-- examples/rpc/query/main.go | 6 +++--- grpc.go | 12 +++++++----- query.go | 2 +- queue.go | 2 +- 9 files changed, 25 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index ed9c8ab..0d8f64b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ KubeMQ Go SDK requires a Go version capable of understanding /vN suffixed import - 1.9.7+ - 1.10.3+ - 1.11+ -- 1.12+ + ``` bash # Go Modules diff --git a/command.go b/command.go index 8f40b4f..39d356a 100644 --- a/command.go +++ b/command.go @@ -53,9 +53,8 @@ func (c *Command) SetTimeout(timeout time.Duration) *Command { return c } - // AddTag - add key value tags to command message -func (c *Command) AddTag(key,value string) *Command { +func (c *Command) AddTag(key, value string) *Command { c.Tags[key] = value return c } diff --git a/event.go b/event.go index aa22acc..4d63888 100644 --- a/event.go +++ b/event.go @@ -45,7 +45,7 @@ func (e *Event) SetBody(body []byte) *Event { } // AddTag - add key value tags to event message -func (e *Event) AddTag(key,value string) *Event { +func (e *Event) AddTag(key, value string) *Event { e.Tags[key] = value return e } diff --git a/examples/queue/batch/main.go b/examples/queue/batch/main.go index 873e294..29a67be 100644 --- a/examples/queue/batch/main.go +++ b/examples/queue/batch/main.go @@ -21,20 +21,20 @@ func main() { defer client.Close() channel := "testing_queue_channel" - batch:=client.NewQueueMessages() - for i:=0; i<10 ; i++ { + batch := client.NewQueueMessages() + for i := 0; i < 10; i++ { batch.Add(client.NewQueueMessage(). - SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d",i)))) + SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d", i)))) } - batchResult,err:= batch.Send(ctx) + batchResult, err := batch.Send(ctx) if err != nil { log.Fatal(err) } for _, sendResult := range batchResult { - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) } - receiveResult,err:= client.NewReceiveQueueMessagesRequest(). + receiveResult, err := client.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(10). SetWaitTimeSeconds(1). @@ -42,12 +42,9 @@ func main() { if err != nil { log.Fatal(err) } - log.Printf("Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } - - - } diff --git a/examples/queue/stream_resend/main.go b/examples/queue/stream_resend/main.go index 81fc5e3..84f6ff7 100644 --- a/examples/queue/stream_resend/main.go +++ b/examples/queue/stream_resend/main.go @@ -32,9 +32,9 @@ func main() { log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) receiverA, err := kubemq.NewClient(ctx, - kubemq.WithUri("http://localhost:9090"), + kubemq.WithAddress("localhost", 50000), kubemq.WithClientId("test-client-sender-id"), - kubemq.WithTransportType(kubemq.TransportTypeRest)) + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) diff --git a/examples/rpc/query/main.go b/examples/rpc/query/main.go index 74a2f92..10b1324 100644 --- a/examples/rpc/query/main.go +++ b/examples/rpc/query/main.go @@ -34,7 +34,7 @@ func main() { case err := <-errCh: log.Fatal(err) return - case query,more := <-queriesCh: + case query, more := <-queriesCh: if !more { fmt.Println("Query Received, done") return @@ -57,13 +57,13 @@ func main() { }() // give some time to connect a receiver - time.Sleep(1 *time.Second) + time.Sleep(1 * time.Second) response, err := client.NewQuery(). SetId("some-query-id"). SetChannel(channel). SetMetadata("some-metadata"). SetBody([]byte("hello kubemq - sending a query, please reply")). - SetTimeout(1 *time.Second). + SetTimeout(1 * time.Second). Send(ctx) if err != nil { log.Fatal(err) diff --git a/grpc.go b/grpc.go index 127152b..46cb59f 100644 --- a/grpc.go +++ b/grpc.go @@ -60,12 +60,12 @@ func newGRPCTransport(ctx context.Context, opts *Options) (Transport, *ServerInf return nil, nil, err } go func() { - select { - case <-ctx.Done(): - if g.conn != nil { - _ = g.conn.Close() - } + + <-ctx.Done() + if g.conn != nil { + _ = g.conn.Close() } + }() g.client = pb.NewKubemqClient(g.conn) @@ -371,6 +371,7 @@ func (g *gRPCTransport) SendCommand(ctx context.Context, command *Command) (*Com Executed: grpcResponse.Executed, ExecutedAt: time.Unix(grpcResponse.Timestamp, 0), Error: grpcResponse.Error, + Tags: grpcResponse.Tags, } return commandResponse, nil } @@ -445,6 +446,7 @@ func (g *gRPCTransport) SendQuery(ctx context.Context, query *Query) (*QueryResp Metadata: grpcResponse.Metadata, ResponseClientId: grpcResponse.ClientID, Body: grpcResponse.Body, + Tags: grpcResponse.Tags, CacheHit: grpcResponse.CacheHit, Error: grpcResponse.Error, } diff --git a/query.go b/query.go index bb1d310..a015a49 100644 --- a/query.go +++ b/query.go @@ -50,7 +50,7 @@ func (q *Query) SetBody(body []byte) *Query { } // AddTag - add key value tags to query message -func (q *Query) AddTag(key,value string) *Query { +func (q *Query) AddTag(key, value string) *Query { q.Tags[key] = value return q } diff --git a/queue.go b/queue.go index e549fc8..171bb38 100644 --- a/queue.go +++ b/queue.go @@ -349,7 +349,7 @@ func (req *StreamQueueMessage) AddTrace(name string) *Trace { // Close - end stream of queue messages and cancel all pending operations func (req *StreamQueueMessage) Close() { req.cancel() - return + } // Next - receive queue messages request , waiting for response or timeout