Skip to content

Commit

Permalink
- update stream transaction
Browse files Browse the repository at this point in the history
- add more queue example
  • Loading branch information
kubemq committed Jul 30, 2019
1 parent 588fca5 commit aa7c2aa
Show file tree
Hide file tree
Showing 7 changed files with 593 additions and 51 deletions.
56 changes: 30 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (

var (
ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance")
ErrNoTransportCpnnection = errors.New("no transport layer established, aborting")
ErrNoTransportConnection = errors.New("no transport layer established, aborting")
)

type ServerInfo struct {
Expand All @@ -25,10 +25,11 @@ type ServerInfo struct {
}

type Client struct {
opts *Options
transport Transport
ServerInfo *ServerInfo
currentSQM *StreamQueueMessage
opts *Options
transport Transport
ServerInfo *ServerInfo
singleStreamQueueMutex chan bool
// currentSQM *StreamQueueMessage
}

func generateUUID() string {
Expand Down Expand Up @@ -59,8 +60,9 @@ func NewClient(ctx context.Context, op ...Option) (*Client, error) {
return nil, err
}
if client.transport == nil {
return nil, ErrNoTransportCpnnection
return nil, ErrNoTransportConnection
}
client.singleStreamQueueMutex = make(chan bool, 1)
return client, nil
}

Expand Down Expand Up @@ -301,27 +303,29 @@ func (c *Client) NewStreamQueueMessage() *StreamQueueMessage {
return c.SQM()
}

func (c *Client) releaseQueueStream() {

}

// SQM - create an empty stream receive queue message object
func (c *Client) SQM() *StreamQueueMessage {
if c.currentSQM == nil || c.currentSQM.isCompleted {
sqm := &StreamQueueMessage{
RequestID: "",
ClientID: c.opts.clientId,
Channel: "",
visibilitySeconds: 0,
waitTimeSeconds: 0,
refSequence: 0,
reqCh: nil,
resCh: nil,
errCh: nil,
doneCh: nil,
msg: nil,
transport: c.transport,
trace: nil,
ctx: nil,
}
c.currentSQM = sqm
return sqm
c.singleStreamQueueMutex <- true
sqm := &StreamQueueMessage{
RequestID: "",
ClientID: c.opts.clientId,
Channel: "",
visibilitySeconds: 0,
waitTimeSeconds: 0,
refSequence: 0,
reqCh: nil,
resCh: nil,
errCh: nil,
doneCh: nil,
msg: nil,
transport: c.transport,
trace: nil,
ctx: nil,
releaseCh: c.singleStreamQueueMutex,
}
return c.currentSQM
return sqm
}
272 changes: 272 additions & 0 deletions examples/queue/stream_chain_queues/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package main

import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/kubemq-io/kubemq-go"
"log"
"sync"
"time"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doneCh := "done"
deadCh := "dead"
sendCount := 100
sender, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-stream-sender-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer sender.Close()

receiver1, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id_receiver_a"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer receiver1.Close()

receiver2, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id_receiver_b"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer receiver2.Close()

receiver3, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id_receiver_c"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer receiver3.Close()

receiver4, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id_receiver_a"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer receiver4.Close()

receiver5, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id_receiver_b"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer receiver5.Close()

receiver6, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-client-sender-id_receiver_c"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)

}
defer receiver6.Close()

wg := sync.WaitGroup{}
wg.Add(6)

for i := 1; i <= sendCount; i++ {
messageID := uuid.New().String()
sendResult, err := sender.NewQueueMessage().
SetId(messageID).
SetChannel("receiverA").
SetBody([]byte(fmt.Sprintf("sending message %d", i))).
Send(ctx)
if err != nil {
log.Fatal(err)
}

log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String())
}

go func() {
defer wg.Done()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
stream := receiver1.NewStreamQueueMessage().SetChannel("receiverA")
// get message from the queue
msg, err := stream.Next(ctx, 10000, 5)
if err != nil {
log.Println("No new messages for ReceiverA")
return
}
log.Printf("Queue: ReceiverA,MessageID: %s, Body: %s,Seq: %d - send to queue receiverB", msg.Id, string(msg.Body), msg.Attributes.Sequence)
time.Sleep(10 * time.Millisecond)
msg.SetChannel("receiverB")
err = stream.ResendWithNewMessage(msg)
if err != nil {
log.Fatal(err)
}
}

}()

go func() {
defer wg.Done()
for {
stream := receiver2.NewStreamQueueMessage().SetChannel("receiverB")
// get message from the queue
msg, err := stream.Next(ctx, 3, 5)
if err != nil {
log.Println("No new messages for ReceiverB")
return
}
log.Printf("Queue: ReceiverB,MessageID: %s, Body: %s - send to new receiverC", msg.Id, string(msg.Body))
time.Sleep(10 * time.Millisecond)
msg.SetChannel("receiverC")
err = stream.ResendWithNewMessage(msg)
if err != nil {
log.Fatal(err)
}
}

}()

go func() {
defer wg.Done()
for {
stream := receiver3.NewStreamQueueMessage().SetChannel("receiverC")
// get message from the queue
msg, err := stream.Next(ctx, 3, 5)
if err != nil {
log.Println("No new messages for ReceiverC")
return
}
log.Printf("Queue: ReceiverC,MessageID: %s, Body: %s - send to new receiverD", msg.Id, string(msg.Body))
time.Sleep(10 * time.Millisecond)
msg.SetChannel("receiverD")
err = stream.ResendWithNewMessage(msg)
if err != nil {
log.Fatal(err)
}
}

}()
go func() {
defer wg.Done()
for {
stream := receiver4.NewStreamQueueMessage().SetChannel("receiverD")
// get message from the queue
msg, err := stream.Next(ctx, 3, 5)
if err != nil {
log.Println("No new messages for ReceiverD")
return
}
log.Printf("Queue: ReceiverD MessageID: %s, Body: %s - send to queue receiverE", msg.Id, string(msg.Body))
time.Sleep(10 * time.Millisecond)
msg.SetChannel("receiverE")
err = stream.ResendWithNewMessage(msg)
if err != nil {
log.Fatal(err)
}
}

}()

go func() {
defer wg.Done()
for {
stream := receiver5.NewStreamQueueMessage().SetChannel("receiverE")
// get message from the queue
msg, err := stream.Next(ctx, 3, 5)
if err != nil {
log.Println("No new messages for ReceiverE")
return
}
log.Printf("Queue: ReceiverE,MessageID: %s, Body: %s - send to new receiverF", msg.Id, string(msg.Body))
time.Sleep(10 * time.Millisecond)
msg.SetChannel("receiverF")
err = stream.ResendWithNewMessage(msg)
if err != nil {
log.Fatal(err)
}
}

}()

go func() {
defer wg.Done()

for {
stream := receiver6.NewStreamQueueMessage().SetChannel("receiverF")
// get message from the queue
msg, err := stream.Next(ctx, 3, 5)
if err != nil {
log.Println("No new messages for ReceiverF")
return
}
log.Printf("Queue: ReceiverF,MessageID: %s, Body: %s - send to sender done", msg.Id, string(msg.Body))
time.Sleep(10 * time.Millisecond)
msg.SetChannel(doneCh)
err = stream.ResendWithNewMessage(msg)
if err != nil {
log.Println("receiverE", "err", err.Error())
continue
}
}

}()
wg.Wait()
res, err := sender.ReceiveQueueMessages(ctx, &kubemq.ReceiveQueueMessagesRequest{
RequestID: "some_request",
ClientID: "sender-client_id",
Channel: doneCh,
MaxNumberOfMessages: 1000,
WaitTimeSeconds: 2,
IsPeak: false,
})
if err != nil {
log.Fatal(err)
}
if res.IsError {
log.Fatal(res.Error)
}
log.Printf("Done Messages - %d: Expried - %d", res.MessagesReceived, res.MessagesExpired)
for i := 0; i < len(res.Messages); i++ {
log.Printf("MessageID: %s, Body: %s, Seq: %d", res.Messages[i].Id, res.Messages[i].Body, res.Messages[i].Attributes.Sequence)
}

res, err = sender.ReceiveQueueMessages(ctx, &kubemq.ReceiveQueueMessagesRequest{
RequestID: "some_request",
ClientID: "sender-client_id",
Channel: deadCh,
MaxNumberOfMessages: int32(sendCount),
WaitTimeSeconds: 2,
IsPeak: false,
})
if err != nil {
log.Fatal(err)
}
if res.IsError {
log.Fatal(res.Error)
}
log.Printf("Dead Letter Messages - %d: Expried - %d", res.MessagesReceived, res.MessagesExpired)
for i := 0; i < len(res.Messages); i++ {
log.Printf("MessageID: %s, Body: %s", res.Messages[i].Id, res.Messages[i].Body)
}
}
Loading

0 comments on commit aa7c2aa

Please sign in to comment.