From aa7c2aa11e19a73bcfffae750fe70a207fd84b7a Mon Sep 17 00:00:00 2001 From: kubemq Date: Tue, 30 Jul 2019 23:09:48 +0300 Subject: [PATCH] - update stream transaction - add more queue example --- client.go | 56 ++-- examples/queue/stream_chain_queues/main.go | 272 ++++++++++++++++++ .../main.go | 271 +++++++++++++++++ examples/queue/stream_dead_letter/main.go | 4 +- .../queue/stream_extend_visibility/main.go | 2 - examples/queue/stream_resend/main.go | 2 +- queue.go | 37 ++- 7 files changed, 593 insertions(+), 51 deletions(-) create mode 100644 examples/queue/stream_chain_queues/main.go create mode 100644 examples/queue/stream_chain_queues_with_dead_letter/main.go diff --git a/client.go b/client.go index 95333f4..c9973f5 100644 --- a/client.go +++ b/client.go @@ -14,7 +14,7 @@ const ( var ( ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance") - ErrNoTransportCpnnection = errors.New("no transport layer established, aborting") + ErrNoTransportConnection = errors.New("no transport layer established, aborting") ) type ServerInfo struct { @@ -25,10 +25,11 @@ type ServerInfo struct { } type Client struct { - opts *Options - transport Transport - ServerInfo *ServerInfo - currentSQM *StreamQueueMessage + opts *Options + transport Transport + ServerInfo *ServerInfo + singleStreamQueueMutex chan bool + // currentSQM *StreamQueueMessage } func generateUUID() string { @@ -59,8 +60,9 @@ func NewClient(ctx context.Context, op ...Option) (*Client, error) { return nil, err } if client.transport == nil { - return nil, ErrNoTransportCpnnection + return nil, ErrNoTransportConnection } + client.singleStreamQueueMutex = make(chan bool, 1) return client, nil } @@ -301,27 +303,29 @@ func (c *Client) NewStreamQueueMessage() *StreamQueueMessage { return c.SQM() } +func (c *Client) releaseQueueStream() { + +} + // SQM - create an empty stream receive queue message object func (c *Client) SQM() *StreamQueueMessage { - if c.currentSQM == nil || c.currentSQM.isCompleted { - sqm := &StreamQueueMessage{ - RequestID: "", - ClientID: c.opts.clientId, - Channel: "", - visibilitySeconds: 0, - waitTimeSeconds: 0, - refSequence: 0, - reqCh: nil, - resCh: nil, - errCh: nil, - doneCh: nil, - msg: nil, - transport: c.transport, - trace: nil, - ctx: nil, - } - c.currentSQM = sqm - return sqm + c.singleStreamQueueMutex <- true + sqm := &StreamQueueMessage{ + RequestID: "", + ClientID: c.opts.clientId, + Channel: "", + visibilitySeconds: 0, + waitTimeSeconds: 0, + refSequence: 0, + reqCh: nil, + resCh: nil, + errCh: nil, + doneCh: nil, + msg: nil, + transport: c.transport, + trace: nil, + ctx: nil, + releaseCh: c.singleStreamQueueMutex, } - return c.currentSQM + return sqm } diff --git a/examples/queue/stream_chain_queues/main.go b/examples/queue/stream_chain_queues/main.go new file mode 100644 index 0000000..a973d3c --- /dev/null +++ b/examples/queue/stream_chain_queues/main.go @@ -0,0 +1,272 @@ +package main + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/kubemq-io/kubemq-go" + "log" + "sync" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doneCh := "done" + deadCh := "dead" + sendCount := 100 + sender, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer sender.Close() + + receiver1, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_a"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver1.Close() + + receiver2, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_b"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver2.Close() + + receiver3, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_c"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver3.Close() + + receiver4, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_a"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver4.Close() + + receiver5, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_b"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver5.Close() + + receiver6, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_c"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver6.Close() + + wg := sync.WaitGroup{} + wg.Add(6) + + for i := 1; i <= sendCount; i++ { + messageID := uuid.New().String() + sendResult, err := sender.NewQueueMessage(). + SetId(messageID). + SetChannel("receiverA"). + SetBody([]byte(fmt.Sprintf("sending message %d", i))). + Send(ctx) + if err != nil { + log.Fatal(err) + } + + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) + } + + go func() { + defer wg.Done() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + for { + stream := receiver1.NewStreamQueueMessage().SetChannel("receiverA") + // get message from the queue + msg, err := stream.Next(ctx, 10000, 5) + if err != nil { + log.Println("No new messages for ReceiverA") + return + } + log.Printf("Queue: ReceiverA,MessageID: %s, Body: %s,Seq: %d - send to queue receiverB", msg.Id, string(msg.Body), msg.Attributes.Sequence) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverB") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + for { + stream := receiver2.NewStreamQueueMessage().SetChannel("receiverB") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverB") + return + } + log.Printf("Queue: ReceiverB,MessageID: %s, Body: %s - send to new receiverC", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverC") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + for { + stream := receiver3.NewStreamQueueMessage().SetChannel("receiverC") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverC") + return + } + log.Printf("Queue: ReceiverC,MessageID: %s, Body: %s - send to new receiverD", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverD") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + go func() { + defer wg.Done() + for { + stream := receiver4.NewStreamQueueMessage().SetChannel("receiverD") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverD") + return + } + log.Printf("Queue: ReceiverD MessageID: %s, Body: %s - send to queue receiverE", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverE") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + for { + stream := receiver5.NewStreamQueueMessage().SetChannel("receiverE") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverE") + return + } + log.Printf("Queue: ReceiverE,MessageID: %s, Body: %s - send to new receiverF", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverF") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + + for { + stream := receiver6.NewStreamQueueMessage().SetChannel("receiverF") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverF") + return + } + log.Printf("Queue: ReceiverF,MessageID: %s, Body: %s - send to sender done", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel(doneCh) + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Println("receiverE", "err", err.Error()) + continue + } + } + + }() + wg.Wait() + res, err := sender.ReceiveQueueMessages(ctx, &kubemq.ReceiveQueueMessagesRequest{ + RequestID: "some_request", + ClientID: "sender-client_id", + Channel: doneCh, + MaxNumberOfMessages: 1000, + WaitTimeSeconds: 2, + IsPeak: false, + }) + if err != nil { + log.Fatal(err) + } + if res.IsError { + log.Fatal(res.Error) + } + log.Printf("Done Messages - %d: Expried - %d", res.MessagesReceived, res.MessagesExpired) + for i := 0; i < len(res.Messages); i++ { + log.Printf("MessageID: %s, Body: %s, Seq: %d", res.Messages[i].Id, res.Messages[i].Body, res.Messages[i].Attributes.Sequence) + } + + res, err = sender.ReceiveQueueMessages(ctx, &kubemq.ReceiveQueueMessagesRequest{ + RequestID: "some_request", + ClientID: "sender-client_id", + Channel: deadCh, + MaxNumberOfMessages: int32(sendCount), + WaitTimeSeconds: 2, + IsPeak: false, + }) + if err != nil { + log.Fatal(err) + } + if res.IsError { + log.Fatal(res.Error) + } + log.Printf("Dead Letter Messages - %d: Expried - %d", res.MessagesReceived, res.MessagesExpired) + for i := 0; i < len(res.Messages); i++ { + log.Printf("MessageID: %s, Body: %s", res.Messages[i].Id, res.Messages[i].Body) + } +} diff --git a/examples/queue/stream_chain_queues_with_dead_letter/main.go b/examples/queue/stream_chain_queues_with_dead_letter/main.go new file mode 100644 index 0000000..ada195a --- /dev/null +++ b/examples/queue/stream_chain_queues_with_dead_letter/main.go @@ -0,0 +1,271 @@ +package main + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/kubemq-io/kubemq-go" + "log" + "sync" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doneCh := "done" + deadCh := "dead" + sendCount := 10 + sender, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer sender.Close() + + receiver1, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_a"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver1.Close() + + receiver2, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_b"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver2.Close() + + receiver3, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_c"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver3.Close() + + receiver4, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_a"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver4.Close() + + receiver5, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_b"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver5.Close() + + receiver6, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-client-sender-id_receiver_c"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer receiver6.Close() + + for i := 1; i <= sendCount; i++ { + messageID := uuid.New().String() + sendResult, err := sender.NewQueueMessage(). + SetId(messageID). + SetChannel("receiverA"). + SetBody([]byte(fmt.Sprintf("sending message %d", i))). + SetPolicyMaxReceiveCount(1). + SetPolicyMaxReceiveQueue(deadCh). + Send(ctx) + if err != nil { + log.Fatal(err) + } + + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) + } + + wg := sync.WaitGroup{} + wg.Add(6) + + go func() { + defer wg.Done() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for { + stream := receiver1.NewStreamQueueMessage().SetChannel("receiverA") + // get message from the queue + msg, err := stream.Next(ctx, 10000, 5) + if err != nil { + log.Println("No new messages for ReceiverA") + return + } + log.Printf("Queue: ReceiverA,MessageID: %s, Body: %s,Seq: %d - send to queue receiverB", msg.Id, string(msg.Body), msg.Attributes.Sequence) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverB") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + + } + + }() + + go func() { + defer wg.Done() + for { + stream := receiver2.NewStreamQueueMessage().SetChannel("receiverB") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverB") + return + } + log.Printf("Queue: ReceiverB,MessageID: %s, Body: %s - send to new receiverC", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverC") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + for { + stream := receiver3.NewStreamQueueMessage().SetChannel("receiverC") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverC") + return + } + log.Printf("Queue: ReceiverC,MessageID: %s, Body: %s - send to new receiverD", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverD") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + go func() { + defer wg.Done() + for { + stream := receiver4.NewStreamQueueMessage().SetChannel("receiverD") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverD") + return + } + log.Printf("Queue: ReceiverD MessageID: %s, Body: %s - send to queue receiverE", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverE") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + for { + stream := receiver5.NewStreamQueueMessage().SetChannel("receiverE") + // get message from the queue + msg, err := stream.Next(ctx, 3, 5) + if err != nil { + log.Println("No new messages for ReceiverE") + return + } + log.Printf("Queue: ReceiverE,MessageID: %s, Body: %s - send to new receiverF", msg.Id, string(msg.Body)) + time.Sleep(10 * time.Millisecond) + msg.SetChannel("receiverF") + err = stream.ResendWithNewMessage(msg) + if err != nil { + log.Fatal(err) + } + } + + }() + + go func() { + defer wg.Done() + + for { + stream := receiver6.NewStreamQueueMessage().SetChannel("receiverF") + // get message from the queue + msg, err := stream.Next(ctx, 1, 5) + if err != nil { + log.Println("No new messages for ReceiverF") + return + } + log.Printf("Queue: ReceiverF,MessageID: %s, Body: %s - wait for 1.1 sec and exit", msg.Id, string(msg.Body)) + time.Sleep(1100 * time.Millisecond) + + } + + }() + wg.Wait() + res, err := sender.ReceiveQueueMessages(ctx, &kubemq.ReceiveQueueMessagesRequest{ + RequestID: "some_request", + ClientID: "sender-client_id", + Channel: doneCh, + MaxNumberOfMessages: 1000, + WaitTimeSeconds: 2, + IsPeak: false, + }) + if err != nil { + log.Fatal(err) + } + if res.IsError { + log.Fatal(res.Error) + } + log.Printf("Done Messages - %d: Expried - %d", res.MessagesReceived, res.MessagesExpired) + for i := 0; i < len(res.Messages); i++ { + log.Printf("MessageID: %s, Body: %s, Seq: %d", res.Messages[i].Id, res.Messages[i].Body, res.Messages[i].Attributes.Sequence) + } + + res, err = sender.ReceiveQueueMessages(ctx, &kubemq.ReceiveQueueMessagesRequest{ + RequestID: "some_request", + ClientID: "sender-client_id", + Channel: deadCh, + MaxNumberOfMessages: int32(sendCount), + WaitTimeSeconds: 2, + IsPeak: false, + }) + if err != nil { + log.Fatal(err) + } + if res.IsError { + log.Fatal(res.Error) + } + log.Printf("Dead Letter Messages - %d: Expried - %d", res.MessagesReceived, res.MessagesExpired) + for i := 0; i < len(res.Messages); i++ { + log.Printf("MessageID: %s, Body: %s", res.Messages[i].Id, res.Messages[i].Body) + } +} diff --git a/examples/queue/stream_dead_letter/main.go b/examples/queue/stream_dead_letter/main.go index 8b6679f..7a5f5fd 100644 --- a/examples/queue/stream_dead_letter/main.go +++ b/examples/queue/stream_dead_letter/main.go @@ -57,9 +57,9 @@ func main() { } log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) log.Println("no ack for 2 sec ") - time.Sleep(2000 * time.Millisecond) + time.Sleep(2100 * time.Millisecond) } - stream.Close() + }() time.Sleep(3 * time.Second) diff --git a/examples/queue/stream_extend_visibility/main.go b/examples/queue/stream_extend_visibility/main.go index c2d16b5..a1da1bd 100644 --- a/examples/queue/stream_extend_visibility/main.go +++ b/examples/queue/stream_extend_visibility/main.go @@ -64,6 +64,4 @@ func main() { } log.Println("ack done") - stream.Close() - } diff --git a/examples/queue/stream_resend/main.go b/examples/queue/stream_resend/main.go index 65363f8..81fc5e3 100644 --- a/examples/queue/stream_resend/main.go +++ b/examples/queue/stream_resend/main.go @@ -55,7 +55,7 @@ func main() { log.Fatal(err) } log.Println("done") - stream.Close() + // checking the new channel stream = receiverA.NewStreamQueueMessage().SetChannel(resendToChannel) // get message from the queue diff --git a/queue.go b/queue.go index 0bdc2e4..8e43080 100644 --- a/queue.go +++ b/queue.go @@ -3,10 +3,8 @@ package kubemq import ( "context" "errors" - "sync" - "time" - "github.com/kubemq-io/kubemq-go/pb" + "time" ) type QueueMessage struct { @@ -319,11 +317,12 @@ type StreamQueueMessage struct { trace *Trace ctx context.Context cancel context.CancelFunc - isCompleted bool - mu sync.Mutex + // isCompleted bool + // mu sync.Mutex + releaseCh chan bool } -// SetId - set streamqueue message request id, otherwise new random uuid will be set +// SetId - set stream queue message request id, otherwise new random uuid will be set func (req *StreamQueueMessage) SetId(id string) *StreamQueueMessage { req.RequestID = id return req @@ -347,17 +346,14 @@ func (req *StreamQueueMessage) AddTrace(name string) *Trace { return req.trace } -// Close - end stream of queue messages +// Close - end stream of queue messages and cancel all pending operations func (req *StreamQueueMessage) Close() { - req.isCompleted = true req.cancel() return } // Next - receive queue messages request , waiting for response or timeout func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error) { - req.mu.Lock() - if req.transport == nil { return nil, ErrNoTransportDefined } @@ -366,26 +362,28 @@ func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) } req.reqCh = make(chan *pb.StreamQueueMessagesRequest, 1) req.resCh = make(chan *pb.StreamQueueMessagesResponse, 1) - req.errCh = make(chan error, 2) - req.doneCh = make(chan bool, 2) - req.ctx, req.cancel = context.WithTimeout(ctx, time.Duration(wait+1)*time.Second) + req.errCh = make(chan error, 1) + req.doneCh = make(chan bool, 1) + req.ctx, req.cancel = context.WithCancel(ctx) go req.transport.StreamQueueMessage(req.ctx, req.reqCh, req.resCh, req.errCh, req.doneCh) go func() { + defer func() { + req.msg = nil + req.cancel() + select { + case <-req.releaseCh: + case <-time.After(1 * time.Second): + } + }() for { select { case <-req.doneCh: - req.isCompleted = true - req.msg = nil - req.cancel() - req.mu.Unlock() return case <-req.ctx.Done(): - req.mu.Unlock() return } } - }() getRequest := &pb.StreamQueueMessagesRequest{ @@ -434,7 +432,6 @@ func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) }, stream: req, } - return req.msg, nil case err := <-req.errCh: return nil, err