Skip to content

Commit

Permalink
fix for response tags in query and command
Browse files Browse the repository at this point in the history
  • Loading branch information
kubemq committed Feb 26, 2020
1 parent d31f5a6 commit b98c8ee
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ KubeMQ Go SDK requires a Go version capable of understanding /vN suffixed import
- 1.9.7+
- 1.10.3+
- 1.11+
- 1.12+


``` bash
# Go Modules
Expand Down
3 changes: 1 addition & 2 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ func (c *Command) SetTimeout(timeout time.Duration) *Command {
return c
}


// AddTag - add key value tags to command message
func (c *Command) AddTag(key,value string) *Command {
func (c *Command) AddTag(key, value string) *Command {
c.Tags[key] = value
return c
}
Expand Down
2 changes: 1 addition & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *Event) SetBody(body []byte) *Event {
}

// AddTag - add key value tags to event message
func (e *Event) AddTag(key,value string) *Event {
func (e *Event) AddTag(key, value string) *Event {
e.Tags[key] = value
return e
}
Expand Down
19 changes: 8 additions & 11 deletions examples/queue/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,30 @@ func main() {
defer client.Close()
channel := "testing_queue_channel"

batch:=client.NewQueueMessages()
for i:=0; i<10 ; i++ {
batch := client.NewQueueMessages()
for i := 0; i < 10; i++ {
batch.Add(client.NewQueueMessage().
SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d",i))))
SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d", i))))
}
batchResult,err:= batch.Send(ctx)
batchResult, err := batch.Send(ctx)
if err != nil {
log.Fatal(err)
}
for _, sendResult := range batchResult {
log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String())
log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String())
}

receiveResult,err:= client.NewReceiveQueueMessagesRequest().
receiveResult, err := client.NewReceiveQueueMessagesRequest().
SetChannel(channel).
SetMaxNumberOfMessages(10).
SetWaitTimeSeconds(1).
Send(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Received %d Messages:\n",receiveResult.MessagesReceived)
log.Printf("Received %d Messages:\n", receiveResult.MessagesReceived)
for _, msg := range receiveResult.Messages {
log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body))
log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body))
}




}
4 changes: 2 additions & 2 deletions examples/queue/stream_resend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func main() {
log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String())

receiverA, err := kubemq.NewClient(ctx,
kubemq.WithUri("http://localhost:9090"),
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id"),
kubemq.WithTransportType(kubemq.TransportTypeRest))
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

Expand Down
6 changes: 3 additions & 3 deletions examples/rpc/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
case err := <-errCh:
log.Fatal(err)
return
case query,more := <-queriesCh:
case query, more := <-queriesCh:
if !more {
fmt.Println("Query Received, done")
return
Expand All @@ -57,13 +57,13 @@ func main() {

}()
// give some time to connect a receiver
time.Sleep(1 *time.Second)
time.Sleep(1 * time.Second)
response, err := client.NewQuery().
SetId("some-query-id").
SetChannel(channel).
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending a query, please reply")).
SetTimeout(1 *time.Second).
SetTimeout(1 * time.Second).
Send(ctx)
if err != nil {
log.Fatal(err)
Expand Down
12 changes: 7 additions & 5 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func newGRPCTransport(ctx context.Context, opts *Options) (Transport, *ServerInf
return nil, nil, err
}
go func() {
select {
case <-ctx.Done():
if g.conn != nil {
_ = g.conn.Close()
}

<-ctx.Done()
if g.conn != nil {
_ = g.conn.Close()
}

}()
g.client = pb.NewKubemqClient(g.conn)

Expand Down Expand Up @@ -371,6 +371,7 @@ func (g *gRPCTransport) SendCommand(ctx context.Context, command *Command) (*Com
Executed: grpcResponse.Executed,
ExecutedAt: time.Unix(grpcResponse.Timestamp, 0),
Error: grpcResponse.Error,
Tags: grpcResponse.Tags,
}
return commandResponse, nil
}
Expand Down Expand Up @@ -445,6 +446,7 @@ func (g *gRPCTransport) SendQuery(ctx context.Context, query *Query) (*QueryResp
Metadata: grpcResponse.Metadata,
ResponseClientId: grpcResponse.ClientID,
Body: grpcResponse.Body,
Tags: grpcResponse.Tags,
CacheHit: grpcResponse.CacheHit,
Error: grpcResponse.Error,
}
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (q *Query) SetBody(body []byte) *Query {
}

// AddTag - add key value tags to query message
func (q *Query) AddTag(key,value string) *Query {
func (q *Query) AddTag(key, value string) *Query {
q.Tags[key] = value
return q
}
Expand Down
2 changes: 1 addition & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (req *StreamQueueMessage) AddTrace(name string) *Trace {
// Close - end stream of queue messages and cancel all pending operations
func (req *StreamQueueMessage) Close() {
req.cancel()
return

}

// Next - receive queue messages request , waiting for response or timeout
Expand Down

0 comments on commit b98c8ee

Please sign in to comment.