diff --git a/client.go b/client.go index c92398e..95333f4 100644 --- a/client.go +++ b/client.go @@ -13,7 +13,8 @@ const ( ) var ( - ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance") + ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance") + ErrNoTransportCpnnection = errors.New("no transport layer established, aborting") ) type ServerInfo struct { @@ -43,7 +44,11 @@ func NewClient(ctx context.Context, op ...Option) (*Client, error) { client := &Client{ opts: opts, } - var err error + + err := opts.Validate() + if err != nil { + return nil, err + } switch opts.transportType { case TransportTypeGRPC: client.transport, client.ServerInfo, err = newGRPCTransport(ctx, opts) @@ -53,12 +58,18 @@ func NewClient(ctx context.Context, op ...Option) (*Client, error) { if err != nil { return nil, err } + if client.transport == nil { + return nil, ErrNoTransportCpnnection + } return client, nil } // Close - closing client connection. any on going transactions will be aborted func (c *Client) Close() error { - return c.transport.Close() + if c.transport != nil { + return c.transport.Close() + } + return nil } // NewEvent - create an empty event @@ -74,7 +85,7 @@ func (c *Client) E() *Event { Metadata: "", Body: nil, ClientId: c.opts.clientId, - Tags: map[string]string{}, + Tags: map[string]string{}, transport: c.transport, } } @@ -92,7 +103,7 @@ func (c *Client) ES() *EventStore { Metadata: "", Body: nil, ClientId: c.opts.clientId, - Tags: map[string]string{}, + Tags: map[string]string{}, transport: c.transport, } } @@ -121,7 +132,7 @@ func (c *Client) C() *Command { Body: nil, Timeout: defaultRequestTimeout, ClientId: c.opts.clientId, - Tags: map[string]string{}, + Tags: map[string]string{}, transport: c.transport, trace: nil, } @@ -143,7 +154,7 @@ func (c *Client) Q() *Query { ClientId: c.opts.clientId, CacheKey: "", CacheTTL: c.opts.defaultCacheTTL, - Tags: map[string]string{}, + Tags: map[string]string{}, transport: c.transport, trace: nil, } @@ -164,7 +175,7 @@ func (c *Client) R() *Response { ClientId: c.opts.clientId, ExecutedAt: time.Time{}, Err: nil, - Tags: map[string]string{}, + Tags: map[string]string{}, transport: c.transport, trace: nil, } @@ -203,7 +214,7 @@ func (c *Client) QM() *QueueMessage { Channel: "", Metadata: "", Body: nil, - Tags: map[string]string{}, + Tags: map[string]string{}, Attributes: nil, Policy: &QueueMessagePolicy{ ExpirationSeconds: 0, diff --git a/event_store.go b/event_store.go index 7208459..224c89e 100644 --- a/event_store.go +++ b/event_store.go @@ -48,7 +48,7 @@ func (es *EventStore) SetBody(body []byte) *EventStore { } // AddTag - add key value tags to event store message -func (es *EventStore) AddTag(key,value string) *EventStore { +func (es *EventStore) AddTag(key, value string) *EventStore { es.Tags[key] = value return es } diff --git a/examples/pubsub/persistence/main.go b/examples/pubsub/persistence/main.go index ca35fac..ea5f120 100644 --- a/examples/pubsub/persistence/main.go +++ b/examples/pubsub/persistence/main.go @@ -37,7 +37,7 @@ func main() { SetChannel(channelName). SetMetadata("some-metadata"). SetBody([]byte("hello kubemq - sending single event to store")). - AddTag("seq",fmt.Sprintf("%d",i)). + AddTag("seq", fmt.Sprintf("%d", i)). Send(ctx) if err != nil { log.Fatal(err) @@ -73,12 +73,12 @@ func main() { log.Fatal(err) } for i := 0; i < 20; i++ { - event,more := <-eventsCh + event, more := <-eventsCh if !more { - log.Println("Receive EventStore done") + log.Println("Next EventStore done") return } - log.Printf("Receive EventStore\nSequence: %d\nTime: %s\nBody: %s\n", event.Sequence, event.Timestamp, event.Body) + log.Printf("Next EventStore\nSequence: %d\nTime: %s\nBody: %s\n", event.Sequence, event.Timestamp, event.Body) } } diff --git a/examples/pubsub/real-time/main.go b/examples/pubsub/real-time/main.go index a063d91..2782acf 100644 --- a/examples/pubsub/real-time/main.go +++ b/examples/pubsub/real-time/main.go @@ -18,7 +18,7 @@ func main() { log.Fatal(err) } defer sender.Close() - receiverA,err:=kubemq.NewClient(ctx, + receiverA, err := kubemq.NewClient(ctx, kubemq.WithUri("http://localhost:9090"), kubemq.WithClientId("test-event-rest-client"), kubemq.WithTransportType(kubemq.TransportTypeRest)) @@ -28,7 +28,7 @@ func main() { defer receiverA.Close() - receiverB,err:=kubemq.NewClient(ctx, + receiverB, err := kubemq.NewClient(ctx, kubemq.WithUri("http://localhost:9090"), kubemq.WithClientId("test-event-rest-client"), kubemq.WithTransportType(kubemq.TransportTypeRest)) diff --git a/examples/queue/ack_all/main.go b/examples/queue/ack_all/main.go index f006886..3a71b3b 100644 --- a/examples/queue/ack_all/main.go +++ b/examples/queue/ack_all/main.go @@ -13,7 +13,7 @@ func main() { defer cancel() sender, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -23,7 +23,7 @@ func main() { workerA, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -31,7 +31,7 @@ func main() { defer workerA.Close() workerB, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -40,52 +40,51 @@ func main() { ackClient, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) } defer ackClient.Close() - batch:=sender.NewQueueMessages() - for i:=0; i<10 ; i++ { + batch := sender.NewQueueMessages() + for i := 0; i < 10; i++ { batch.Add(sender.NewQueueMessage(). - SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d",i)))) + SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d", i)))) } - batchResult,err:= batch.Send(ctx) + batchResult, err := batch.Send(ctx) if err != nil { log.Fatal(err) } for _, sendResult := range batchResult { - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) } - - receiveResult,err:= ackClient.NewAckAllQueueMessagesRequest(). + receiveResult, err := ackClient.NewAckAllQueueMessagesRequest(). SetChannel(channel). SetWaitTimeSeconds(2). Send(ctx) if err != nil { return } - log.Printf("Ack Messages: %d completed\n",receiveResult.AffectedMessages) + log.Printf("ack Messages: %d completed\n", receiveResult.AffectedMessages) // try Consuming the messages time.Sleep(time.Second) go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(1). Send(ctx) if err != nil { - log.Printf("WorkerA Error: %s\n",err.Error()) + log.Printf("WorkerA Error: %s\n", err.Error()) return } - log.Printf("Worker A Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker A Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } } @@ -93,21 +92,21 @@ func main() { go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(1). Send(ctx) if err != nil { - log.Printf("WorkerB Error: %s\n",err.Error()) + log.Printf("WorkerB Error: %s\n", err.Error()) return } - log.Printf("Worker B Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker B Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } } }() - time.Sleep(2*time.Second) + time.Sleep(2 * time.Second) } diff --git a/examples/queue/delayed/main.go b/examples/queue/delayed/main.go index 5e11b04..be2099d 100644 --- a/examples/queue/delayed/main.go +++ b/examples/queue/delayed/main.go @@ -13,17 +13,17 @@ func main() { defer cancel() sender, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) } defer sender.Close() - channel := "testing_queue_channel1" + channel := "testing_queue_channel" workerA, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -31,72 +31,70 @@ func main() { defer workerA.Close() workerB, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) } defer workerB.Close() - - - batch:=sender.NewQueueMessages() - for i:=0; i<10 ; i++ { + batch := sender.NewQueueMessages() + for i := 0; i < 200; i++ { batch.Add(sender.NewQueueMessage(). SetChannel(channel). - SetBody([]byte(fmt.Sprintf("Batch Message %d",i))). - SetPolicyDelaySeconds(5)) + SetBody([]byte(fmt.Sprintf("Batch Message %d", i))). + SetPolicyDelaySeconds(5).AddTag("message", fmt.Sprintf("%d", i))) } - batchResult,err:= batch.Send(ctx) + + batchResult, err := batch.Send(ctx) if err != nil { log.Fatal(err) } for _, sendResult := range batchResult { - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) } -// waiting for 5 seconds - time.Sleep(5 *time.Second) - - - - - go func() { - for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). - SetChannel(channel). - SetMaxNumberOfMessages(1). - SetWaitTimeSeconds(10). - Send(ctx) - if err != nil { - return - } - log.Printf("Worker A Received %d Messages at %s: ",receiveResult.MessagesReceived,time.Now().String()) - for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s\n",msg.Id,string(msg.Body)) - } - - - } - - }() - - go func() { - for { - receiveResult,err:= workerB.NewReceiveQueueMessagesRequest(). - SetChannel(channel). - SetMaxNumberOfMessages(1). - SetWaitTimeSeconds(10). - Send(ctx) - if err != nil { - return - } - log.Printf("Worker B Received %d Messages at %s: ",receiveResult.MessagesReceived,time.Now().String()) - for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s\n",msg.Id,string(msg.Body)) - } - } - - }() - time.Sleep(3 *time.Second) + // waiting for 5 seconds + time.Sleep(5 * time.Second) + // + //go func() { + // for { + // receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). + // SetChannel(channel). + // SetMaxNumberOfMessages(1). + // SetWaitTimeSeconds(10). + // Send(ctx) + // if err != nil { + // return + // } + // + // log.Printf("Worker A Received %d Messages at %s: ", receiveResult.MessagesReceived, time.Now().String()) + // for _, msg := range receiveResult.Messages { + // log.Printf("MessageID: %s, Body: %s\n", msg.Id, string(msg.Body)) + // } + // + // } + // + //}() + // + //go func() { + // for { + // + // receiveResult, err := workerB.NewReceiveQueueMessagesRequest(). + // SetChannel(channel). + // SetMaxNumberOfMessages(1). + // SetWaitTimeSeconds(10). + // Send(ctx) + // if err != nil { + // return + // } + // + // log.Printf("Worker B Received %d Messages at %s: ", receiveResult.MessagesReceived, time.Now().String()) + // for _, msg := range receiveResult.Messages { + // log.Printf("MessageID: %s, Body: %s\n", msg.Id, string(msg.Body)) + // } + // } + // + //}() + //time.Sleep(4 * time.Second) } diff --git a/examples/queue/expiration/main.go b/examples/queue/expiration/main.go index 63c1ac4..49d2fff 100644 --- a/examples/queue/expiration/main.go +++ b/examples/queue/expiration/main.go @@ -13,7 +13,7 @@ func main() { defer cancel() sender, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -23,7 +23,7 @@ func main() { workerA, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -31,37 +31,34 @@ func main() { defer workerA.Close() workerB, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) } defer workerB.Close() - - - batch:=sender.NewQueueMessages() - for i:=0; i<10 ; i++ { + batch := sender.NewQueueMessages() + for i := 0; i < 10; i++ { batch.Add(sender.NewQueueMessage(). SetChannel(channel). - SetBody([]byte(fmt.Sprintf("Batch Message %d",i))). + SetBody([]byte(fmt.Sprintf("Batch Message %d", i))). SetPolicyExpirationSeconds(2)) } - batchResult,err:= batch.Send(ctx) + batchResult, err := batch.Send(ctx) if err != nil { log.Fatal(err) } for _, sendResult := range batchResult { - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) } -// waiting for 5 seconds - time.Sleep(5 *time.Second) - + // waiting for 5 seconds + time.Sleep(5 * time.Second) go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(10). SetWaitTimeSeconds(2). @@ -69,11 +66,11 @@ func main() { if err != nil { return } - log.Printf("Worker A Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker A Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } - log.Printf("Worker A notified of %d Expired Messages:\n",receiveResult.MessagesExpired) + log.Printf("Worker A notified of %d Expired Messages:\n", receiveResult.MessagesExpired) } @@ -81,7 +78,7 @@ func main() { go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(10). SetWaitTimeSeconds(2). @@ -89,13 +86,13 @@ func main() { if err != nil { return } - log.Printf("Worker B Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker B Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } - log.Printf("Worker B notified of %d Expired Messages:\n",receiveResult.MessagesExpired) + log.Printf("Worker B notified of %d Expired Messages:\n", receiveResult.MessagesExpired) } }() - time.Sleep(3 *time.Second) + time.Sleep(3 * time.Second) } diff --git a/examples/queue/peak/main.go b/examples/queue/peak/main.go index dac5253..38046f7 100644 --- a/examples/queue/peak/main.go +++ b/examples/queue/peak/main.go @@ -13,7 +13,7 @@ func main() { defer cancel() sender, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -23,7 +23,7 @@ func main() { workerA, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -31,7 +31,7 @@ func main() { defer workerA.Close() workerB, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -40,29 +40,29 @@ func main() { peakClient, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) } defer peakClient.Close() - batch:=sender.NewQueueMessages() - for i:=0; i<10 ; i++ { + batch := sender.NewQueueMessages() + for i := 0; i < 10; i++ { batch.Add(sender.NewQueueMessage(). - SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d",i)))) + SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d", i)))) } - batchResult,err:= batch.Send(ctx) + batchResult, err := batch.Send(ctx) if err != nil { log.Fatal(err) } for _, sendResult := range batchResult { - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) } // Peaking the messaging time.Sleep(time.Second) - receiveResult,err:= peakClient.NewReceiveQueueMessagesRequest(). + receiveResult, err := peakClient.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(10). SetWaitTimeSeconds(2). @@ -71,9 +71,9 @@ func main() { if err != nil { return } - log.Printf("Peak Client Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Peak Client Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("Peaking MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("Peaking MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } // Consuming the messages @@ -81,7 +81,7 @@ func main() { go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(2). @@ -89,9 +89,9 @@ func main() { if err != nil { return } - log.Printf("Worker A Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker A Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } } @@ -99,7 +99,7 @@ func main() { go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(2). @@ -107,9 +107,9 @@ func main() { if err != nil { return } - log.Printf("Worker B Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker B Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } } diff --git a/examples/queue/simple/main.go b/examples/queue/simple_queue/main.go similarity index 67% rename from examples/queue/simple/main.go rename to examples/queue/simple_queue/main.go index f163068..d4f22ef 100644 --- a/examples/queue/simple/main.go +++ b/examples/queue/simple_queue/main.go @@ -20,17 +20,16 @@ func main() { defer client.Close() channel := "testing_queue_channel" - sendResult,err:= client.NewQueueMessage(). + sendResult, err := client.NewQueueMessage(). SetChannel(channel). - SetBody([]byte("some-simple-queue-message")). + SetBody([]byte("some-simple_queue-queue-message")). Send(ctx) if err != nil { log.Fatal(err) } - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) - - receiveResult,err:= client.NewReceiveQueueMessagesRequest(). + receiveResult, err := client.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(1). @@ -38,11 +37,9 @@ func main() { if err != nil { log.Fatal(err) } - log.Printf("Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } - - } diff --git a/examples/queue/stream_dead_letter/main.go b/examples/queue/stream_dead_letter/main.go new file mode 100644 index 0000000..8b6679f --- /dev/null +++ b/examples/queue/stream_dead_letter/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "github.com/kubemq-io/kubemq-go" + "log" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sender, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer sender.Close() + channel := "testing_queue_channel_dead-letter-queue" + deadLetterQueue := "dead-letter-queue" + + sendResult, err := sender.NewQueueMessage(). + SetChannel(channel). + SetBody([]byte("queue-message-with-dead-letter-queue")). + SetPolicyMaxReceiveCount(3). + SetPolicyMaxReceiveQueue(deadLetterQueue). + Send(ctx) + if err != nil { + log.Fatal(err) + } + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) + + receiverA, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + //kubemq.WithUri("http://localhost:9090"), + //kubemq.WithClientId("test-client-sender-id"), + //kubemq.WithTransportType(kubemq.TransportTypeRest)) + if err != nil { + log.Fatal(err) + + } + defer receiverA.Close() + + go func() { + stream := receiverA.NewStreamQueueMessage().SetChannel(channel) + + for i := 0; i < 3; i++ { + // get message from the queue + msg, err := stream.Next(ctx, 2, 10) + if err != nil { + log.Fatal(err) + } + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + log.Println("no ack for 2 sec ") + time.Sleep(2000 * time.Millisecond) + } + stream.Close() + }() + + time.Sleep(3 * time.Second) + + stream := sender.NewStreamQueueMessage().SetChannel(deadLetterQueue) + + // get message from the queue + msg, err := stream.Next(ctx, 10, 60) + if err != nil { + log.Fatal(err) + } + log.Printf("Dead-Letter Queue MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + err = msg.Ack() + if err != nil { + log.Fatal(err) + } + stream.Close() +} diff --git a/examples/queue/stream_extend_visibility/main.go b/examples/queue/stream_extend_visibility/main.go new file mode 100644 index 0000000..c2d16b5 --- /dev/null +++ b/examples/queue/stream_extend_visibility/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "github.com/kubemq-io/kubemq-go" + "log" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sender, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer sender.Close() + channel := "testing_queue_channel_visibility" + + sendResult, err := sender.NewQueueMessage(). + SetChannel(channel). + SetBody([]byte("queue-message-with-for-extend-visibility")). + Send(ctx) + if err != nil { + log.Fatal(err) + } + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) + + receiverA, err := kubemq.NewClient(ctx, + kubemq.WithUri("http://localhost:9090"), + kubemq.WithClientId("test-client-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeRest)) + if err != nil { + log.Fatal(err) + + } + defer receiverA.Close() + + stream := receiverA.NewStreamQueueMessage().SetChannel(channel) + + // get message from the queue + msg, err := stream.Next(ctx, 5, 10) + if err != nil { + log.Fatal(err) + } + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + log.Println("work for 1 seconds") + time.Sleep(1000 * time.Millisecond) + log.Println("need more time to process, extend visibility for more 3 seconds") + err = msg.ExtendVisibility(3) + if err != nil { + log.Fatal(err) + } + log.Println("approved. work for 2.5 seconds") + time.Sleep(2500 * time.Millisecond) + log.Println("work done.... ack the message") + err = msg.Ack() + if err != nil { + log.Fatal(err) + } + log.Println("ack done") + + stream.Close() + +} diff --git a/examples/queue/stream_resend/main.go b/examples/queue/stream_resend/main.go new file mode 100644 index 0000000..65363f8 --- /dev/null +++ b/examples/queue/stream_resend/main.go @@ -0,0 +1,90 @@ +package main + +import ( + "context" + "github.com/kubemq-io/kubemq-go" + "log" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sender, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer sender.Close() + channel := "testing_queue_channel_resend" + resendToChannel := "testing_queue_channel_destination" + + sendResult, err := sender.NewQueueMessage(). + SetChannel(channel). + SetBody([]byte("queue-message-resend")). + Send(ctx) + if err != nil { + log.Fatal(err) + } + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) + + receiverA, err := kubemq.NewClient(ctx, + kubemq.WithUri("http://localhost:9090"), + kubemq.WithClientId("test-client-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeRest)) + if err != nil { + log.Fatal(err) + + } + defer receiverA.Close() + + stream := receiverA.NewStreamQueueMessage().SetChannel(channel) + // get message from the queue + msg, err := stream.Next(ctx, 5, 10) + if err != nil { + log.Fatal(err) + } + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + log.Println("resend to new queue") + + err = msg.Resend(resendToChannel) + if err != nil { + log.Fatal(err) + } + log.Println("done") + stream.Close() + // checking the new channel + stream = receiverA.NewStreamQueueMessage().SetChannel(resendToChannel) + // get message from the queue + msg, err = stream.Next(ctx, 5, 10) + if err != nil { + log.Fatal(err) + } + + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + log.Println("resend with new message") + newMsg := receiverA.NewQueueMessage().SetChannel(channel).SetBody([]byte("new message")) + err = stream.ResendWithNewMessage(newMsg) + + if err != nil { + log.Fatal(err) + } + stream.Close() + log.Println("checking again the old queue") + stream = receiverA.NewStreamQueueMessage().SetChannel(channel) + // get message from the queue + msg, err = stream.Next(ctx, 5, 10) + if err != nil { + log.Fatal(err) + } + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + + err = msg.Ack() + if err != nil { + log.Fatal(err) + } + log.Println("ack and done") +} diff --git a/examples/queue/stream_simple/main.go b/examples/queue/stream_simple/main.go new file mode 100644 index 0000000..ed38ec2 --- /dev/null +++ b/examples/queue/stream_simple/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "github.com/kubemq-io/kubemq-go" + "log" + "time" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sender, err := kubemq.NewClient(ctx, + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + + } + defer sender.Close() + + receiver, err := kubemq.NewClient(ctx, + kubemq.WithUri("http://localhost:9090"), + kubemq.WithClientId("test-stream-sender-id"), + kubemq.WithTransportType(kubemq.TransportTypeRest)) + if err != nil { + log.Fatal(err) + + } + defer receiver.Close() + channel := "testing_queue_channel" + sendResult, err := sender.NewQueueMessage(). + SetChannel(channel). + SetBody([]byte("some-stream_simple-queue-message")). + Send(ctx) + if err != nil { + log.Fatal(err) + } + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) + + stream := receiver.NewStreamQueueMessage().SetChannel(channel) + + // get message from the queue + msg, err := stream.Next(ctx, 10, 10) + if err != nil { + log.Fatal(err) + } + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) + log.Println("doing some work.....") + time.Sleep(time.Second) + // ack the current message + log.Println("done, ack the message") + err = msg.Ack() + if err != nil { + log.Fatal(err) + } + + log.Println("checking for next message") + msg, err = stream.Next(ctx, 10, 1) + if err != nil { + log.Println(err.Error()) + } + if msg == nil { + log.Println("no new message in the queue") + } + stream.Close() +} diff --git a/examples/queue/work_queue/main.go b/examples/queue/work_queue/main.go index fe0706f..3a51520 100644 --- a/examples/queue/work_queue/main.go +++ b/examples/queue/work_queue/main.go @@ -13,7 +13,7 @@ func main() { defer cancel() sender, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -23,7 +23,7 @@ func main() { workerA, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) @@ -31,17 +31,16 @@ func main() { defer workerA.Close() workerB, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId("test-command-sender-id"), + kubemq.WithClientId("test-client-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { log.Fatal(err) } defer workerB.Close() - go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(2). @@ -49,9 +48,9 @@ func main() { if err != nil { return } - log.Printf("Worker A Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker A Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } } @@ -59,7 +58,7 @@ func main() { go func() { for { - receiveResult,err:= workerA.NewReceiveQueueMessagesRequest(). + receiveResult, err := workerA.NewReceiveQueueMessagesRequest(). SetChannel(channel). SetMaxNumberOfMessages(1). SetWaitTimeSeconds(2). @@ -67,26 +66,26 @@ func main() { if err != nil { return } - log.Printf("Worker B Received %d Messages:\n",receiveResult.MessagesReceived) + log.Printf("Worker B Received %d Messages:\n", receiveResult.MessagesReceived) for _, msg := range receiveResult.Messages { - log.Printf("MessageID: %s, Body: %s",msg.Id,string(msg.Body)) + log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body)) } } }() time.Sleep(time.Second) - batch:=sender.NewQueueMessages() - for i:=0; i<10 ; i++ { + batch := sender.NewQueueMessages() + for i := 0; i < 10; i++ { batch.Add(sender.NewQueueMessage(). - SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d",i)))) + SetChannel(channel).SetBody([]byte(fmt.Sprintf("Batch Message %d", i)))) } - batchResult,err:= batch.Send(ctx) + batchResult, err := batch.Send(ctx) if err != nil { log.Fatal(err) } for _, sendResult := range batchResult { - log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID,time.Unix(0,sendResult.SentAt).String()) + log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String()) } time.Sleep(time.Second) diff --git a/grpc.go b/grpc.go index 2819c29..b79c41d 100644 --- a/grpc.go +++ b/grpc.go @@ -56,7 +56,7 @@ func newGRPCTransport(ctx context.Context, opts *Options) (Transport, *ServerInf si, err := g.Ping(ctx) if err != nil { - return nil, nil, err + return nil, &ServerInfo{}, nil } return g, si, nil diff --git a/options.go b/options.go index dd8f075..f911a4a 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,7 @@ package kubemq import ( + "errors" "strings" "time" ) @@ -119,8 +120,8 @@ func WithTransportType(transportType TransportType) Option { func GetDefaultOptions() *Options { return &Options{ - host: "localhost", - port: 50000, + host: "", + port: 0, isSecured: false, certFile: "", serverOverrideDomain: "", @@ -129,5 +130,27 @@ func GetDefaultOptions() *Options { receiveBufferSize: 10, defaultChannel: "", defaultCacheTTL: time.Minute * 15, + transportType: 0, + restUri: "", + webSocketUri: "", } } + +func (o *Options) Validate() error { + switch o.transportType { + case TransportTypeGRPC: + if o.host== "" { + return errors.New("invalid host") + } + if o.port<=0 { + return errors.New("invalid port") + } + case TransportTypeRest: + if o.restUri== "" { + return errors.New("invalid address uri") + } + default: + return errors.New("no transport type was set") + } + return nil +} \ No newline at end of file diff --git a/queue.go b/queue.go index 190261c..0bdc2e4 100644 --- a/queue.go +++ b/queue.go @@ -20,6 +20,7 @@ type QueueMessage struct { Policy *QueueMessagePolicy transport Transport trace *Trace + stream *StreamQueueMessage } // SetId - set queue message id, otherwise new random uuid will be set @@ -54,12 +55,11 @@ func (qm *QueueMessage) SetBody(body []byte) *QueueMessage { } // AddTag - add key value tags to query message -func (qm *QueueMessage) AddTag(key,value string) *QueueMessage { +func (qm *QueueMessage) AddTag(key, value string) *QueueMessage { qm.Tags[key] = value return qm } - // SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage { qm.Policy.ExpirationSeconds = int32(sec) @@ -86,6 +86,9 @@ func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage { // Send - sending queue message request , waiting for response or timeout func (qm *QueueMessage) Send(ctx context.Context) (*SendQueueMessageResult, error) { + if qm.transport == nil { + return nil, ErrNoTransportDefined + } return qm.transport.SendQueueMessage(ctx, qm) } @@ -95,6 +98,40 @@ func (qm *QueueMessage) AddTrace(name string) *Trace { return qm.trace } +// ack - sending ack queue message in stream queue message mode +func (qm *QueueMessage) Ack() error { + if qm.stream != nil { + return qm.stream.ack() + } + return errors.New("non-stream mode queue message ") +} + +// reject - sending reject queue message in stream queue message mode +func (qm *QueueMessage) Reject() error { + if qm.stream != nil { + return qm.stream.reject() + } + return errors.New("non-stream mode queue message ") +} + +// ExtendVisibility - extend the visibility time for the current receive message +func (qm *QueueMessage) ExtendVisibility(value int32) error { + + if qm.stream != nil { + return qm.stream.extendVisibility(value) + } + return errors.New("non-stream mode queue message ") +} + +// Resend - sending resend +func (qm *QueueMessage) Resend(channel string) error { + if qm.stream != nil { + return qm.stream.resend(channel) + } + + return errors.New("non-stream mode queue message ") +} + type QueueMessages struct { Messages []*QueueMessage transport Transport @@ -286,7 +323,7 @@ type StreamQueueMessage struct { mu sync.Mutex } -// SetId - set stream queue message request id, otherwise new random uuid will be set +// SetId - set streamqueue message request id, otherwise new random uuid will be set func (req *StreamQueueMessage) SetId(id string) *StreamQueueMessage { req.RequestID = id return req @@ -310,10 +347,17 @@ func (req *StreamQueueMessage) AddTrace(name string) *Trace { return req.trace } -// Receive - receive queue messages request , waiting for response or timeout -func (req *StreamQueueMessage) Receive(ctx context.Context, visibility, wait int32) (*QueueMessage, error) { +// Close - end stream of queue messages +func (req *StreamQueueMessage) Close() { + req.isCompleted = true + req.cancel() + return +} + +// Next - receive queue messages request , waiting for response or timeout +func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error) { req.mu.Lock() - defer req.mu.Unlock() + if req.transport == nil { return nil, ErrNoTransportDefined } @@ -334,8 +378,10 @@ func (req *StreamQueueMessage) Receive(ctx context.Context, visibility, wait int req.isCompleted = true req.msg = nil req.cancel() + req.mu.Unlock() return case <-req.ctx.Done(): + req.mu.Unlock() return } } @@ -386,6 +432,7 @@ func (req *StreamQueueMessage) Receive(ctx context.Context, visibility, wait int MaxReceiveCount: resMsg.Policy.MaxReceiveCount, MaxReceiveQueue: resMsg.Policy.MaxReceiveQueue, }, + stream: req, } return req.msg, nil @@ -397,10 +444,11 @@ func (req *StreamQueueMessage) Receive(ctx context.Context, visibility, wait int } -// Ack - ack the received queue messages waiting for response or timeout -func (req *StreamQueueMessage) Ack() error { +// ack - ack the received queue messages waiting for response or timeout +func (req *StreamQueueMessage) ack() error { + if req.msg == nil { - return errors.New("no active message to ack, call Receive first") + return errors.New("no active message to ack, call Next first") } ackRequest := &pb.StreamQueueMessagesRequest{ @@ -427,10 +475,11 @@ func (req *StreamQueueMessage) Ack() error { return nil } -// Reject - reject the received queue messages waiting for response or timeout -func (req *StreamQueueMessage) Reject() error { +// reject - reject the received queue messages waiting for response or timeout +func (req *StreamQueueMessage) reject() error { + if req.msg == nil { - return errors.New("no active message to reject, call Receive first") + return errors.New("no active message to reject, call Next first") } rejRequest := &pb.StreamQueueMessagesRequest{ @@ -448,6 +497,8 @@ func (req *StreamQueueMessage) Reject() error { case getResponse := <-req.resCh: if getResponse.IsError { return errors.New(getResponse.Error) + } else { + } case err := <-req.errCh: return err @@ -457,10 +508,11 @@ func (req *StreamQueueMessage) Reject() error { return nil } -// ExtendVisibility - extend the visibility time for the current receive message -func (req *StreamQueueMessage) ExtendVisibility(value int32) error { +// extendVisibility - extend the visibility time for the current receive message +func (req *StreamQueueMessage) extendVisibility(value int32) error { + if req.msg == nil { - return errors.New("no active message to extend visibility, call Receive first") + return errors.New("no active message to extend visibility, call Next first") } extRequest := &pb.StreamQueueMessagesRequest{ @@ -487,10 +539,11 @@ func (req *StreamQueueMessage) ExtendVisibility(value int32) error { return nil } -// Resend - resend the current received message to a new channel and ack the current message -func (req *StreamQueueMessage) Resend(channel string) error { +// resend - resend the current received message to a new channel and ack the current message +func (req *StreamQueueMessage) resend(channel string) error { + if req.msg == nil { - return errors.New("no active message to resend, call Receive first") + return errors.New("no active message to resend, call Next first") } extRequest := &pb.StreamQueueMessagesRequest{ @@ -519,8 +572,9 @@ func (req *StreamQueueMessage) Resend(channel string) error { // ResendWithNewMessage - resend the current received message to a new channel func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error { + if req.msg == nil { - return errors.New("no active message to resend, call Receive first") + return errors.New("no active message to resend, call Next first") } extRequest := &pb.StreamQueueMessagesRequest{ RequestID: req.RequestID, diff --git a/rest.go b/rest.go index 4b17b48..4a055ac 100644 --- a/rest.go +++ b/rest.go @@ -14,9 +14,9 @@ import ( ) type restResponse struct { - IsError bool `json:"is_error"` + IsError bool `json:"is_error"` Message string `json:"message"` - Data json.RawMessage `json:"data"` + Data json.RawMessage `json:"data"` } func (res *restResponse) unmarshal(v interface{}) error { @@ -131,7 +131,7 @@ func newRestTransport(ctx context.Context, opts *Options) (Transport, *ServerInf } si, err := rt.Ping(ctx) if err != nil { - return nil, nil, err + return nil, &ServerInfo{}, nil } return rt, si, nil } @@ -191,7 +191,7 @@ func (rt *restTransport) StreamEvents(ctx context.Context, eventsCh chan *Event, go func() { for { select { - case <-readCh: + case <-readCh: case err := <-wsErrCh: errCh <- err @@ -300,8 +300,8 @@ func (rt *restTransport) StreamEventsStore(ctx context.Context, eventsCh chan *E go func() { for { select { - case pbMsg:=<-readCh: - result:=&EventStoreResult{} + case pbMsg := <-readCh: + result := &EventStoreResult{} err := json.Unmarshal([]byte(pbMsg), result) if err != nil { errCh <- err @@ -334,7 +334,7 @@ func (rt *restTransport) SubscribeToEventsStore(ctx context.Context, channel, gr eventsCh := make(chan *EventStoreReceive, rt.opts.receiveBufferSize) subOption := subscriptionOption{} opt.apply(&subOption) - uri := fmt.Sprintf("%s/subscribe/events?&client_id=%s&channel=%s&group=%s&subscribe_type=%s&events_store_type_data=%d&events_store_type_value=%d", rt.wsAddress, rt.id, channel, group, "persistence", subOption.kind , subOption.value) + uri := fmt.Sprintf("%s/subscribe/events?&client_id=%s&channel=%s&group=%s&subscribe_type=%s&events_store_type_data=%d&events_store_type_value=%d", rt.wsAddress, rt.id, channel, group, "persistence", subOption.kind, subOption.value) rxChan := make(chan string) ready := make(chan struct{}, 1) wsErrCh := make(chan error, 1)