Skip to content

Commit

Permalink
Adding queue examples
Browse files Browse the repository at this point in the history
  • Loading branch information
kubemq committed Jul 15, 2019
1 parent bed47a6 commit 383b403
Show file tree
Hide file tree
Showing 18 changed files with 571 additions and 186 deletions.
29 changes: 20 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const (
)

var (
ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance")
ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance")
ErrNoTransportCpnnection = errors.New("no transport layer established, aborting")
)

type ServerInfo struct {
Expand Down Expand Up @@ -43,7 +44,11 @@ func NewClient(ctx context.Context, op ...Option) (*Client, error) {
client := &Client{
opts: opts,
}
var err error

err := opts.Validate()
if err != nil {
return nil, err
}
switch opts.transportType {
case TransportTypeGRPC:
client.transport, client.ServerInfo, err = newGRPCTransport(ctx, opts)
Expand All @@ -53,12 +58,18 @@ func NewClient(ctx context.Context, op ...Option) (*Client, error) {
if err != nil {
return nil, err
}
if client.transport == nil {
return nil, ErrNoTransportCpnnection
}
return client, nil
}

// Close - closing client connection. any on going transactions will be aborted
func (c *Client) Close() error {
return c.transport.Close()
if c.transport != nil {
return c.transport.Close()
}
return nil
}

// NewEvent - create an empty event
Expand All @@ -74,7 +85,7 @@ func (c *Client) E() *Event {
Metadata: "",
Body: nil,
ClientId: c.opts.clientId,
Tags: map[string]string{},
Tags: map[string]string{},
transport: c.transport,
}
}
Expand All @@ -92,7 +103,7 @@ func (c *Client) ES() *EventStore {
Metadata: "",
Body: nil,
ClientId: c.opts.clientId,
Tags: map[string]string{},
Tags: map[string]string{},
transport: c.transport,
}
}
Expand Down Expand Up @@ -121,7 +132,7 @@ func (c *Client) C() *Command {
Body: nil,
Timeout: defaultRequestTimeout,
ClientId: c.opts.clientId,
Tags: map[string]string{},
Tags: map[string]string{},
transport: c.transport,
trace: nil,
}
Expand All @@ -143,7 +154,7 @@ func (c *Client) Q() *Query {
ClientId: c.opts.clientId,
CacheKey: "",
CacheTTL: c.opts.defaultCacheTTL,
Tags: map[string]string{},
Tags: map[string]string{},
transport: c.transport,
trace: nil,
}
Expand All @@ -164,7 +175,7 @@ func (c *Client) R() *Response {
ClientId: c.opts.clientId,
ExecutedAt: time.Time{},
Err: nil,
Tags: map[string]string{},
Tags: map[string]string{},
transport: c.transport,
trace: nil,
}
Expand Down Expand Up @@ -203,7 +214,7 @@ func (c *Client) QM() *QueueMessage {
Channel: "",
Metadata: "",
Body: nil,
Tags: map[string]string{},
Tags: map[string]string{},
Attributes: nil,
Policy: &QueueMessagePolicy{
ExpirationSeconds: 0,
Expand Down
2 changes: 1 addition & 1 deletion event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (es *EventStore) SetBody(body []byte) *EventStore {
}

// AddTag - add key value tags to event store message
func (es *EventStore) AddTag(key,value string) *EventStore {
func (es *EventStore) AddTag(key, value string) *EventStore {
es.Tags[key] = value
return es
}
Expand Down
8 changes: 4 additions & 4 deletions examples/pubsub/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
SetChannel(channelName).
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending single event to store")).
AddTag("seq",fmt.Sprintf("%d",i)).
AddTag("seq", fmt.Sprintf("%d", i)).
Send(ctx)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -73,12 +73,12 @@ func main() {
log.Fatal(err)
}
for i := 0; i < 20; i++ {
event,more := <-eventsCh
event, more := <-eventsCh
if !more {
log.Println("Receive EventStore done")
log.Println("Next EventStore done")
return
}
log.Printf("Receive EventStore\nSequence: %d\nTime: %s\nBody: %s\n", event.Sequence, event.Timestamp, event.Body)
log.Printf("Next EventStore\nSequence: %d\nTime: %s\nBody: %s\n", event.Sequence, event.Timestamp, event.Body)
}

}
4 changes: 2 additions & 2 deletions examples/pubsub/real-time/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {
log.Fatal(err)
}
defer sender.Close()
receiverA,err:=kubemq.NewClient(ctx,
receiverA, err := kubemq.NewClient(ctx,
kubemq.WithUri("http://localhost:9090"),
kubemq.WithClientId("test-event-rest-client"),
kubemq.WithTransportType(kubemq.TransportTypeRest))
Expand All @@ -28,7 +28,7 @@ func main() {

defer receiverA.Close()

receiverB,err:=kubemq.NewClient(ctx,
receiverB, err := kubemq.NewClient(ctx,
kubemq.WithUri("http://localhost:9090"),
kubemq.WithClientId("test-event-rest-client"),
kubemq.WithTransportType(kubemq.TransportTypeRest))
Expand Down
41 changes: 20 additions & 21 deletions examples/queue/ack_all/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func main() {
defer cancel()
sender, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-command-sender-id"),
kubemq.WithClientId("test-client-sender-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
Expand All @@ -23,15 +23,15 @@ func main() {

workerA, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-command-sender-id"),
kubemq.WithClientId("test-client-sender-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
defer workerA.Close()
workerB, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-command-sender-id"),
kubemq.WithClientId("test-client-sender-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
Expand All @@ -40,74 +40,73 @@ func main() {

ackClient, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-command-sender-id"),
kubemq.WithClientId("test-client-sender-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
defer ackClient.Close()

batch:=sender.NewQueueMessages()
for i:=0; i<10 ; i++ {
batch := sender.NewQueueMessages()
for i := 0; i < 10; i++ {
batch.Add(sender.NewQueueMessage().
SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d",i))))
SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d", i))))
}
batchResult,err:= batch.Send(ctx)
batchResult, err := batch.Send(ctx)
if err != nil {
log.Fatal(err)
}
for _, sendResult := range batchResult {
log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String())
log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String())
}

receiveResult,err:= ackClient.NewAckAllQueueMessagesRequest().
receiveResult, err := ackClient.NewAckAllQueueMessagesRequest().
SetChannel(channel).
SetWaitTimeSeconds(2).
Send(ctx)
if err != nil {
return
}
log.Printf("Ack Messages: %d completed\n",receiveResult.AffectedMessages)
log.Printf("ack Messages: %d completed\n", receiveResult.AffectedMessages)

// try Consuming the messages
time.Sleep(time.Second)

go func() {
for {
receiveResult,err:= workerA.NewReceiveQueueMessagesRequest().
receiveResult, err := workerA.NewReceiveQueueMessagesRequest().
SetChannel(channel).
SetMaxNumberOfMessages(1).
SetWaitTimeSeconds(1).
Send(ctx)
if err != nil {
log.Printf("WorkerA Error: %s\n",err.Error())
log.Printf("WorkerA Error: %s\n", err.Error())
return
}
log.Printf("Worker A Received %d Messages:\n",receiveResult.MessagesReceived)
log.Printf("Worker A Received %d Messages:\n", receiveResult.MessagesReceived)
for _, msg := range receiveResult.Messages {
log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body))
log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body))
}
}

}()

go func() {
for {
receiveResult,err:= workerA.NewReceiveQueueMessagesRequest().
receiveResult, err := workerA.NewReceiveQueueMessagesRequest().
SetChannel(channel).
SetMaxNumberOfMessages(1).
SetWaitTimeSeconds(1).
Send(ctx)
if err != nil {
log.Printf("WorkerB Error: %s\n",err.Error())
log.Printf("WorkerB Error: %s\n", err.Error())
return
}
log.Printf("Worker B Received %d Messages:\n",receiveResult.MessagesReceived)
log.Printf("Worker B Received %d Messages:\n", receiveResult.MessagesReceived)
for _, msg := range receiveResult.Messages {
log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body))
log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body))
}
}

}()
time.Sleep(2*time.Second)
time.Sleep(2 * time.Second)
}
Loading

0 comments on commit 383b403

Please sign in to comment.