Skip to content

Commit

Permalink
Added status ping and metrics (#16)
Browse files Browse the repository at this point in the history
* Added status ping and metrics

Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>

* Updated readme for subscription ping

Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>

* Removed logs that was used for debug

Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>
  • Loading branch information
aneeshkp authored Jun 28, 2021
1 parent 2581222 commit c8f37f0
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 14 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,29 @@ If publisher is present for the event, then event creation is success and be ret
| 202 | Accepted | object |
| 400 | Error Bad Request | object |

### /subscriptions/status/{subscriptionid}

#### PUT
##### Summary

Creates a new status ping request.

##### Description

If a subscription is present for the request, then status request is success and be returned with Accepted (202).

##### Parameters

| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ---- |
| subscriptionid | request | subscription id | Yes | |

##### Responses

| Code | Description | Schema |
| ---- | ----------- | ------ |
| 202 | Accepted | object |
| 400 | Error Bad Request | object |

### Models

Expand Down
21 changes: 15 additions & 6 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@ All these metrics are prefixed with `cne_`

| Name | Description | Type |
|-------------------------------------------------------|----------------------------------------------------------|---------|
| cne_events_api_published | Metric to get number of events published by the rest api. | Gauge |
| cne_api_events_published | Metric to get number of events published by the rest api. | Gauge |
| cne_api_subscriptions | Metric to get number of subscriptions. | Gauge |
| cne_api_publishers | Metric to get number of publishers. | Gauge |
| cne_api_status_ping | Metric to get number of status pings. | Gauge |


`cne_events_api_published` - The number of events published via rest-api, and their status by address.
`cne_api_events_published` - The number of events published via rest-api, and their status by address.

Example
```json
# HELP cne_events_api_published Metric to get number of events published by the rest api
# TYPE cne_events_api_published gauge
cne_events_api_published{address="/news-service/finance",status="success"} 9
cne_events_api_published{address="/news-service/sports",status="success"} 9
# HELP cne_api_events_published Metric to get number of events published by the rest api
# TYPE cne_api_events_published gauge
cne_api_events_published{address="/news-service/finance",status="success"} 9
cne_api_events_published{address="/news-service/sports",status="success"} 9
```

`cne_api_subscriptions` - This metrics indicates number of subscriptions that are active.
Expand All @@ -63,5 +64,13 @@ Example
cne_api_publishers{status="active"} 2
```

`cne_api_status_ping` - This metrics indicates number of status pings were made

Example
```json
# HELP cne_status_api_ping Metric to get number of status pings
# TYPE cne_api_status_ping gauge
cne_api_status_ping{status="active"} 2
```


2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.11.0
github.com/redhat-cne/sdk-go v0.0.0-20210624034130-7e978efb9f8d
github.com/redhat-cne/sdk-go v0.0.0-20210626184356-7e16fddf03af
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redhat-cne/sdk-go v0.0.0-20210624034130-7e978efb9f8d h1:5tnYOHZsRwOnhsDKsldHq4ehlwNzQOfRMhCqDuoKSm8=
github.com/redhat-cne/sdk-go v0.0.0-20210624034130-7e978efb9f8d/go.mod h1:FqtKJTyjOtOS5YPs3M0CZ9Vc/RYgEv9ecp+Zq4Lbeyk=
github.com/redhat-cne/sdk-go v0.0.0-20210626184356-7e16fddf03af h1:vEnYmkYpiXewhG8z26lAPjE6GMNZ6R1NYAWfgQc/eo4=
github.com/redhat-cne/sdk-go v0.0.0-20210626184356-7e16fddf03af/go.mod h1:FqtKJTyjOtOS5YPs3M0CZ9Vc/RYgEv9ecp+Zq4Lbeyk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
16 changes: 15 additions & 1 deletion pkg/localmetrics/localmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
//eventPublishedCount ... Total no of events published by the api
eventPublishedCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_events_api_published",
Name: "cne_api_events_published",
Help: "Metric to get number of events published by the rest api",
}, []string{"address", "status"})

Expand All @@ -55,13 +55,21 @@ var (
Name: "cne_api_publishers",
Help: "Metric to get number of publishers",
}, []string{"status"})

//statusCallCount ... Total no of status check was made
statusCallCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_api_status_ping",
Help: "Metric to get number of status call",
}, []string{"status"})
)

// RegisterMetrics ... register metrics
func RegisterMetrics() {
prometheus.MustRegister(eventPublishedCount)
prometheus.MustRegister(subscriptionCount)
prometheus.MustRegister(publisherCount)
prometheus.MustRegister(statusCallCount)
}

// UpdateEventPublishedCount ...
Expand All @@ -81,3 +89,9 @@ func UpdatePublisherCount(status MetricStatus, val int) {
publisherCount.With(
prometheus.Labels{"status": string(status)}).Add(float64(val))
}

// UpdateStatusCount ...
func UpdateStatusCount(address string,status MetricStatus, val int) {
statusCallCount.With(
prometheus.Labels{"status": string(status)}).Add(float64(val))
}
22 changes: 18 additions & 4 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ package restapi
import (
"encoding/json"
"fmt"
"time"

"github.com/redhat-cne/sdk-go/pkg/types"

"github.com/redhat-cne/rest-api/pkg/localmetrics"

cloudevents "github.com/cloudevents/sdk-go/v2"
ce "github.com/cloudevents/sdk-go/v2/event"

cne "github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/pubsub"

"github.com/redhat-cne/sdk-go/v1/event"
Expand Down Expand Up @@ -131,9 +135,9 @@ func (s *Server) createPublisher(w http.ResponseWriter, r *http.Request) {
}
}

// check sub.EndpointURI by get
// check pub.EndpointURI by get
pub.SetID(uuid.New().String())
_ = pub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", pub.ID)) //nolint:errcheck
_ = pub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "publishers", pub.ID)) //nolint:errcheck
newPub, err := s.pubSubAPI.CreatePublisher(pub)
if err != nil {
log.Printf("error creating publisher %v", err)
Expand Down Expand Up @@ -352,13 +356,23 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req
respondWithError(w, "subscription not found")
return
}
cneEvent := event.CloudNativeEvent()
cneEvent.SetID(sub.ID)
cneEvent.Type = "status_check"
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(cloudevents.ApplicationJSON)
cneEvent.SetData(cne.Data{
Version: "v1",
})
ceEvent, err := cneEvent.NewCloudEvent(&sub)

if err != nil {
respondWithError(w, err.Error())
} else {
s.dataOut <- &channel.DataChan{
Type: channel.EVENT,
Address: sub.GetResource(),
Type: channel.STATUS,
Data: ceEvent,
Address: fmt.Sprintf("%s/%s", sub.GetResource(), "status"),
}
respondWithMessage(w, http.StatusAccepted, "ping sent")
}
Expand Down
138 changes: 138 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"testing"
"time"

event3 "github.com/redhat-cne/sdk-go/pkg/event"
event2 "github.com/redhat-cne/sdk-go/v1/event"

log "github.com/sirupsen/logrus"

"github.com/redhat-cne/rest-api"
Expand Down Expand Up @@ -321,7 +324,142 @@ func TestServer_KillAndRecover(t *testing.T) {

}

// New get new rest client
func NewRestClient() *Rest {
return &Rest{
client: http.Client{
Timeout: 1 * time.Second,
},
}
}

func publishEvent(e event3.Event) {
//create publisher
url := &types.URI{URL: url.URL{Scheme: "http",
Host: fmt.Sprintf("localhost:%d", port),
Path: fmt.Sprintf("%s%s", apPath, "create/event")}}
rc := NewRestClient()
err := rc.PostEvent(url, e)
if err != nil {
log.Errorf("error publishing events %v to url %s", err, url.String())
} else {
log.Debugf("published event %s", e.ID)
}
}

func Test_MultiplePost(t *testing.T) {
pub := pubsub.PubSub{
ID: "",
EndPointURI: &types.URI{URL: url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port), Path: fmt.Sprintf("%s%s", apPath, "dummy")}},
Resource: resource,
}
pubData, err := json.Marshal(&pub)
assert.Nil(t, err)
assert.NotNil(t, pubData)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d%s%s", port, apPath, "publishers"), bytes.NewBuffer(pubData))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer func() {
if resp != nil {
resp.Body.Close()
}

}()
pubBodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
err = json.Unmarshal(pubBodyBytes, &ObjPub)
assert.Nil(t, err)
assert.Equal(t, http.StatusCreated, resp.StatusCode)
assert.NotEmpty(t, ObjPub.ID)
assert.NotEmpty(t, ObjPub.URILocation)
assert.NotEmpty(t, ObjPub.EndPointURI)
assert.NotEmpty(t, ObjPub.Resource)
assert.Equal(t, pub.Resource, ObjPub.Resource)
log.Infof("publisher \n%s", ObjPub.String())

cneEvent := event2.CloudNativeEvent()
cneEvent.SetID(ObjPub.ID)
cneEvent.Type = "ptp_status_type"
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(event3.ApplicationJSON)
data := event3.Data{
Version: "event3",
Values: []event3.DataValue{{
Resource: "test",
DataType: event3.NOTIFICATION,
ValueType: event3.ENUMERATION,
Value: event3.ACQUIRING_SYNC,
},
},
}
data.SetVersion("v1") //nolint:errcheck
cneEvent.SetData(data)
for i := 0; i < 5; i++ {
go publishEvent(cneEvent)
}
time.Sleep(2 * time.Second)
}

func TestServer_End(t *testing.T) {
close(eventOutCh)
close(closeCh)
}

// Rest client to make http request
type Rest struct {
client http.Client
}

// Post post with data
func (r *Rest) Post(url *types.URI, data []byte) int {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
request, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewBuffer(data))
if err != nil {
log.Errorf("error creating post request %v", err)
return http.StatusBadRequest
}
request.Header.Set("content-type", "application/json")
response, err := r.client.Do(request)
if err != nil {
log.Errorf("error in post response %v", err)
return http.StatusBadRequest
}
if response.Body != nil {
defer response.Body.Close()
// read any content and print
body, readErr := ioutil.ReadAll(response.Body)
if readErr == nil && len(body) > 0 {
log.Debugf("%s return response %s\n", url.String(), string(body))
}
}
return response.StatusCode
}

// New get new rest client
func New() *Rest {
return &Rest{
client: http.Client{
Timeout: 1 * time.Second,
},
}
}

// PostEvent post an event to the give url and check for error
func (r *Rest) PostEvent(url *types.URI, e event3.Event) error {
b, err := json.Marshal(e)
if err != nil {
log.Errorf("error marshalling event %v", e)
return err
}
if status := r.Post(url, b); status == http.StatusBadRequest {
return fmt.Errorf("post returned status %d", status)
}
return nil
}

0 comments on commit c8f37f0

Please sign in to comment.