From 9756af3b1af1b4e80dbca20315bf4bc8741ba431 Mon Sep 17 00:00:00 2001 From: kubemq Date: Sun, 4 Aug 2019 16:16:49 +0300 Subject: [PATCH] - update examples - update rest interface web socket error - update readme --- README.md | 323 +-------------------- client.go | 5 +- examples/queue/stream_chain_queues/main.go | 27 +- queue.go | 3 +- rest.go | 3 +- 5 files changed, 32 insertions(+), 329 deletions(-) diff --git a/README.md b/README.md index ed52f79..f23ccd3 100644 --- a/README.md +++ b/README.md @@ -1,317 +1,18 @@ # Go -KubeMQ is an enterprise-grade message broker for containers, designed for any workload and architecture running in Kubernetes. +KubeMQ is an enterprise-grade message queue and broker for containers, designed for any workload and architecture running in Kubernetes. This library is Go implementation of KubeMQ client connection. -### Installation +## Installation `$ go get -u github.com/kubemq-io/kubemq-go ` -### Examples -Please find usage examples on the examples folders. - -### KubeMQ server -Please visit https://kubemq.io, create an account, get KubeMQ token, and follow the instructions to run the KubeMQ docker container in your environment. - -## Core Concepts -KubeMQ messaging broker has 4 messaging patterns: - -- Events - real-time pub/sub pattern -- Events Store - pub/sub with persistence pattern -- Commands - the Command part of CQRS pattern, which sends commands with the response for executed or not (with proper error messaging) -- Queries - the Query part of CQRS pattern, which sends a query and gets a response with the relevant query result back - -For each one of the patterns, we can distinguish between the senders and the receivers. - -For events and events store, the KubeMQ supports both RPC and upstream calls. - -the data model is almost identical between all the pattern with some data added related to the specific patter. - -The common part of all the patterns are: - -- Id - the sender can set the Id for each type of message, or the Id is automatically generated a UUID Id for him. -- Metadata - a string field that can hold any metadata related to the message -- Body - a Bytes array which holds the actual payload to be sent from the sender to the receiver - -The KubeMQ core transport is based on gRPC, and the library is a wrapper around the client-side of gRPC complied protobuf hence leveraging the gRPC benefits and advantages. - -Before any transactions to be performed with KubeMQ server, the Client should connect and dial KubeMQ server and obtain Client connection. - -With the Client connection object, the user can perform all transactions to and from KubeMQ server. - -A Client connection object is thread-safe and can be shared between all process needed to communicate with KubeMQ. - -**IMPORTANT** - it's the user responsibility to close the Client connection when no further communication with KubeMQ is needed. - -## Connection - -Connecting to KubeMQ server can be done like that: -``` -ctx, cancel := context.WithCancel(context.Background()) -defer cancel() -client, err := kubemq.NewClient(ctx, -kubemq.WithAddress("localhost", 50000), -kubemq.WithClientId("test-event-client-id")) -if err != nil { - log.Fatal(err) -} -defer Client.Close() -``` -List of connection options: +## Documentation +Please visit our [docs](https://docs.kubemq.io/reference/go.html). -- WithAddress - set host and port address of KubeMQ server -- WithCredentials - set secured TLS credentials from the input certificate file for Client. -- WithToken - set KubeMQ token to be used for KubeMQ connection - not mandatory, only if enforced by the KubeMQ server -- WithClientId - set client id to be used in all functions call with this Client - mandatory -- WithReceiveBufferSize - set length of the buffered channel to be set in all subscriptions -- WithDefaultChannel - set default channel for any outbound requests -- WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value -- WithTransportType - set client transport type, currently gRPC or Rest - -## Events -### Sending Events -#### Single Event -``` -err := client.E(). - SetId("some-id"). - SetChannel(channel). - SetMetadata("some-metadata"). - SetBody([]byte("hello kubemq - sending single event")). - Send(ctx) -if err != nil { - log.Fatal(err) -} -``` -#### Stream Events -``` -eventStreamCh := make(chan *kubemq.Event, 1) -errStreamCh := make(chan error, 1) -go client.StreamEvents(ctx, eventStreamCh, errStreamCh) -event := client.E().SetId("some-event-id"). - SetChannel("some_channel"). - SetMetadata("some-metadata"). - SetBody([]byte("hello kubemq - sending stream event")) -for { - select { - case err := <-errStreamCh: - log.Println(err) - return - case eventStreamCh <- event: - return - } -} -``` -### Receiving Events -First you should subscribe to Events and get a channel: -``` -channelName := "testing_event_channel" -errCh := make(chan error) -eventsCh, err := client.SubscribeToEvents(ctx, channelName, "", errCh) -if err != nil { - log.Fatal(err) -} -``` -Then you can loop over the channel of events: -``` -for { - select { - case err := <-errCh: - log.Fatal(err) - case event := <-eventsCh: - log.Printf("Event Received:\nEventID: %s\nChannel: %s\nMetadata: %s\nBody: %s\n", event.Id, event.Channel, event.Metadata, event.Body) - } -} -``` - -## Events Store -### Sending Events Store -#### Single Event to Store -``` -//sending 10 single events to store -for i := 0; i < 10; i++ { - result, err := client.ES(). - SetId(fmt.Sprintf("event-store-%d", i)). - SetChannel(channelName). - SetMetadata("some-metadata"). - SetBody([]byte("hello kubemq - sending single event to store")). - Send(ctx) - if err != nil { - log.Fatal(err) - } - log.Printf("Sending event #%d: Result: %t", i, result.Sent) -} -``` -#### Stream Events Store -``` -// sending addtional events to store -eventsStoreStreamCh := make(chan *kubemq.EventStore, 1) -eventsStoreSResultCh := make(chan *kubemq.EventStoreResult, 1) -errStreamCh := make(chan error, 1) -go client.StreamEventsStore(ctx, eventsStoreStreamCh, eventsStoreSResultCh, errStreamCh) -for i := 0; i < 10; i++ { - event := client.ES(). - SetId(fmt.Sprintf("event-store-%d", i)). - SetChannel(channelName). - SetMetadata("some-metadata"). - SetBody([]byte("hello kubemq - sending stream event to store")) - eventsStoreStreamCh <- event - select { - case err := <-errStreamCh: - log.Println(err) - return - case result := <-eventsStoreSResultCh: - log.Printf("Sending event #%d: Result: %t", i, result.Sent) - } -} -``` -### Receiving Events Store -First you should subscribe to Events Store and get a channel: -``` -eventsCh, err := client.SubscribeToEventsStore(ctx, channelName, "", errCh, kubemq.StartFromFirstEvent()) -if err != nil { - log.Fatal(err) - } - -``` -#### Subscription Options -KubeMQ supports 6 types of subscriptions: -- StartFromNewEvents - start event store subscription with only new events -- StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point -- StartFromLastEvent - replay the last event and continue stream new events from this point -- StartFromSequence - replay events from specific event sequence number and continue stream new events from this point -- StartFromTime - replay events from specific time continue stream new events from this point -- StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point - -Then you can loop over the channel of events: -``` -for { - select { - case err := <-errCh: - log.Fatal(err) - case event := <-eventsCh: - log.Printf("Receive EventStore\nSequence: %d\nTime: %s\nBody: %s\n", event.Sequence, event.Timestamp, event.Body) - } -} -``` - -## Commands -### Concept -Commands implement synchronous messaging pattern which the sender send a request and wait for a specific amount of time to get a response. - -The response can be successful or not. This is the responsibility of the responder to return with the result of the command within the time the sender set in the request. - -### Sending Command Requests -In this example, the responder should send his response withing one second. Otherwise, an error will be return as timeout. -``` -response, err := client.C(). - SetId("some-command-id"). - SetChannel(channelName). - SetMetadata("some-metadata"). - SetBody([]byte("hello kubemq - sending command, please reply")). - SetTimeout(time.Second). - Send(ctx) -if err != nil { - log.Fatal(err) -} -``` - -### Receiving Commands Requests -First get a channel of commands: -``` -errCh := make(chan error) -commandsCh, err := client.SubscribeToCommands(ctx, channelName, "", errCh) -if err != nil { - log.Fatal(err) - } -``` -Then a loop over the channel will get the requests from the senders. -``` -for { - select { - case err := <-errCh: - log.Fatal(err) - return - case command := <-commandsCh: - log.Printf("Command Received:\nId %s\nChannel: %s\nMetadata: %s\nBody: %s\n", command.Id, command.Channel, command.Metadata, command.Body) - case <-ctx.Done(): - return - } -} -``` - -### Sending a Command Response -When sending a response, there are two essential things to remember: -- Set the relevant requestId which you respond to -- Set the ResponseTo string to the value of the request ResponseTo field - -``` -err := client.R(). - SetRequestId(command.Id). - SetResponseTo(command.ResponseTo). - SetExecutedAt(time.Now()). - Send(ctx) -if err != nil { - log.Fatal(err) -} -``` - -## Queries -### Concept -Queries implement synchronous messaging pattern which the sender send a request and wait for a specific amount of time to get a response. - -The response must include metadata or body together with an indication of successful or not operation. This is the responsibility of the responder to return with the result of the query within the time the sender set in the request. - -### Sending Query Requests -In this example, the responder should send his response withing one second. Otherwise, an error will be return as timeout. -``` -response, err := client.Q(). - SetId("some-query-id"). - SetChannel(channel). - SetMetadata("some-metadata"). - SetBody([]byte("hello kubemq - sending a query, please reply")). - SetTimeout(time.Second). - Send(ctx) -if err != nil { - log.Fatal(err) -} -``` - -### Receiving Query Requests -First get a channel of queries: -``` -errCh := make(chan error) -queriesCh, err := client.SubscribeToQueries(ctx, channelName, "", errCh) -if err != nil { - log.Fatal(err) -} -``` -Then a loop over the channel will get the requests from the senders. -``` -for { - select { - case err := <-errCh: - log.Fatal(err) - return - case query := <-queriesCh: - log.Printf("Query Received:\nId %s\nChannel: %s\nMetadata: %s\nBody: %s\n", query.Id, query.Channel, query.Metadata, query.Body) - case <-ctx.Done(): - return - } -} -``` - -### Sending a Query Response -When sending a response, there are two essential things to remember: -- Set the relevant requestId which you respond to -- Set the ResponseTo string to the value of the request ResponseTo field +## Examples +Please visit our extensive [examples](https://github.com/kubemq-io/kubemq-go/tree/master/examples) folder +Please find usage examples on the examples folders. -``` -err := client.R(). - SetRequestId(query.Id). - SetResponseTo(query.ResponseTo). - SetExecutedAt(time.Now()). - SetMetadata("this is a response"). - SetBody([]byte("got your query, you are good to go")). - Send(ctx) - -if err != nil { - log.Fatal(err) -} -``` +## Support +if you encounter any issues, please open an issue here, +In addition, you can reach us for support by: +- [**Email**](mailto://support@kubemq.io) +- [**Slack**](https://kubmq.slack.com) diff --git a/client.go b/client.go index c9973f5..2aa2456 100644 --- a/client.go +++ b/client.go @@ -303,12 +303,9 @@ func (c *Client) NewStreamQueueMessage() *StreamQueueMessage { return c.SQM() } -func (c *Client) releaseQueueStream() { - -} - // SQM - create an empty stream receive queue message object func (c *Client) SQM() *StreamQueueMessage { + c.singleStreamQueueMutex <- true sqm := &StreamQueueMessage{ RequestID: "", diff --git a/examples/queue/stream_chain_queues/main.go b/examples/queue/stream_chain_queues/main.go index a973d3c..067ca74 100644 --- a/examples/queue/stream_chain_queues/main.go +++ b/examples/queue/stream_chain_queues/main.go @@ -13,11 +13,14 @@ import ( func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + host := "localhost" + port := 50000 + uri := "http://localhost:9090" doneCh := "done" deadCh := "dead" - sendCount := 100 + sendCount := 10 sender, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithAddress(host, port), kubemq.WithClientId("test-stream-sender-id"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { @@ -27,9 +30,9 @@ func main() { defer sender.Close() receiver1, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithUri(uri), kubemq.WithClientId("test-client-sender-id_receiver_a"), - kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + kubemq.WithTransportType(kubemq.TransportTypeRest)) if err != nil { log.Fatal(err) @@ -37,7 +40,7 @@ func main() { defer receiver1.Close() receiver2, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithAddress(host, port), kubemq.WithClientId("test-client-sender-id_receiver_b"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { @@ -47,7 +50,7 @@ func main() { defer receiver2.Close() receiver3, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithAddress(host, port), kubemq.WithClientId("test-client-sender-id_receiver_c"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { @@ -57,9 +60,9 @@ func main() { defer receiver3.Close() receiver4, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithUri(uri), kubemq.WithClientId("test-client-sender-id_receiver_a"), - kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + kubemq.WithTransportType(kubemq.TransportTypeRest)) if err != nil { log.Fatal(err) @@ -67,7 +70,7 @@ func main() { defer receiver4.Close() receiver5, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithAddress(host, port), kubemq.WithClientId("test-client-sender-id_receiver_b"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { @@ -77,7 +80,7 @@ func main() { defer receiver5.Close() receiver6, err := kubemq.NewClient(ctx, - kubemq.WithAddress("localhost", 50000), + kubemq.WithAddress(host, port), kubemq.WithClientId("test-client-sender-id_receiver_c"), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { @@ -110,7 +113,7 @@ func main() { for { stream := receiver1.NewStreamQueueMessage().SetChannel("receiverA") // get message from the queue - msg, err := stream.Next(ctx, 10000, 5) + msg, err := stream.Next(ctx, 3, 5) if err != nil { log.Println("No new messages for ReceiverA") return @@ -129,6 +132,7 @@ func main() { go func() { defer wg.Done() for { + stream := receiver2.NewStreamQueueMessage().SetChannel("receiverB") // get message from the queue msg, err := stream.Next(ctx, 3, 5) @@ -143,6 +147,7 @@ func main() { if err != nil { log.Fatal(err) } + time.Sleep(100 * time.Millisecond) } }() diff --git a/queue.go b/queue.go index 8e43080..34c71d8 100644 --- a/queue.go +++ b/queue.go @@ -366,7 +366,6 @@ func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) req.doneCh = make(chan bool, 1) req.ctx, req.cancel = context.WithCancel(ctx) go req.transport.StreamQueueMessage(req.ctx, req.reqCh, req.resCh, req.errCh, req.doneCh) - go func() { defer func() { req.msg = nil @@ -375,6 +374,7 @@ func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) case <-req.releaseCh: case <-time.After(1 * time.Second): } + }() for { select { @@ -396,7 +396,6 @@ func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) RefSequence: 0, ModifiedMessage: nil, } - req.reqCh <- getRequest select { case getResponse := <-req.resCh: diff --git a/rest.go b/rest.go index 2a98955..9ff14f6 100644 --- a/rest.go +++ b/rest.go @@ -84,6 +84,7 @@ func newBiDirectionalWebsocketConn(ctx context.Context, uri string, readCh chan for { _, message, err := c.ReadMessage() if err != nil { + errCh <- err return } select { @@ -335,7 +336,6 @@ func (rt *restTransport) SubscribeToEventsStore(ctx context.Context, channel, gr subOption := subscriptionOption{} opt.apply(&subOption) uri := fmt.Sprintf("%s/subscribe/events?&client_id=%s&channel=%s&group=%s&subscribe_type=%s&events_store_type_data=%d&events_store_type_value=%d", rt.wsAddress, rt.id, channel, group, "events_store", subOption.kind, subOption.value) - fmt.Println(uri) rxChan := make(chan string) ready := make(chan struct{}, 1) wsErrCh := make(chan error, 1) @@ -712,6 +712,7 @@ func (rt *restTransport) StreamQueueMessage(ctx context.Context, reqCh chan *pb. case req := <-reqCh: data, _ := json.Marshal(req) writeCh <- data + case err := <-wsErrCh: errCh <- err return