Skip to content

Commit

Permalink
Made add-sample-bucket rest-endpoint, and cleaned up load sample time…
Browse files Browse the repository at this point in the history
…out logic.
  • Loading branch information
willbroadbelt authored and chvck committed May 27, 2020
1 parent 7cd62fd commit 7c5b16e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 19 deletions.
26 changes: 7 additions & 19 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,6 @@ func (n *Node) ChangeBucketCompression(bucket, mode string) error {
}

func (n *Node) CreateBucket(conf *Bucket) error {
if helper.SampleBucketsCount[conf.Name] != 0 {
glog.Info("Loading sample bucket %s", conf.Name)
return n.LoadSample(conf.Name)
}

body := fmt.Sprintf("bucketType=%s&name=%s&ramQuotaMB=%s&replicaNumber=%d",
conf.Type, conf.Name, conf.RamQuotaMB,
conf.ReplicaCount)
Expand Down Expand Up @@ -708,8 +703,7 @@ func (n *Node) PollSampleBucket(s string) error {
Cred: n.RestLogin,
}

info := make(chan map[string]interface{})
loadTimeout := time.NewTimer(5 * time.Minute)
deadline := time.Now().Add(4 * time.Minute)

for {
resp, err := helper.RestRetryer(helper.RestRetry, params, helper.GetResponse)
Expand All @@ -722,18 +716,12 @@ func (n *Node) PollSampleBucket(s string) error {
return err
}

go func() {
glog.Infof("parsed=%v", parsed)
info <- parsed
}()
select {
case status := <-info:
basicStats := status["basicStats"].(map[string]interface{})
if basicStats["itemCount"].(float64) == helper.SampleBucketsCount[s] {
glog.Infof("Sample bucket %s is loaded", s)
return nil
}
case <-loadTimeout.C:
basicStats := parsed["basicStats"].(map[string]interface{})
if basicStats["itemCount"].(float64) == helper.SampleBucketsCount[s] {
glog.Infof("Sample bucket %s is loaded", s)
return nil
}
if time.Now().After(deadline) {
return errors.New("Timeout while loading sample bucket.")
}
}
Expand Down
37 changes: 37 additions & 0 deletions daemon/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type AddBucketOptions struct {
Conf AddBucketJSON
}

type AddSampleOptions struct {
Conf AddSampleBucketJSON
}

func addBucket(ctx context.Context, clusterID string, opts AddBucketOptions) error {
log.Printf("Adding bucket %s to cluster %s (requested by: %s)", opts.Conf.Name, clusterID, ContextUser(ctx))

Expand Down Expand Up @@ -49,3 +53,36 @@ func addBucket(ctx context.Context, clusterID string, opts AddBucketOptions) err
RamQuotaMB: strconv.Itoa(opts.Conf.RamQuota),
})
}

func addSampleBucket(ctx context.Context, clusterID string, opts AddSampleOptions) error {
log.Printf("Loading sample bucket %s to cluster %s (requested by: %s)", opts.Conf.SampleBucket, clusterID, ContextUser(ctx))

if helper.SampleBucketsCount[opts.Conf.SampleBucket] == 0 {
return errors.New("Unknown sample bucket")
}

c, err := getCluster(ctx, clusterID)
if err != nil {
return err
}

if len(c.Nodes) == 0 {
return errors.New("no nodes available")
}

n := c.Nodes[0]
ipv4 := n.IPv4Address
hostname := ipv4
if opts.Conf.UseHostname {
hostname = n.ContainerName[1:] + helper.DomainPostfix
}

node := &cluster.Node{
HostName: hostname,
Port: strconv.Itoa(helper.RestPort),
SshLogin: &helper.Cred{Username: helper.SshUser, Password: helper.SshPass, Hostname: ipv4, Port: helper.SshPort},
RestLogin: &helper.Cred{Username: helper.RestUser, Password: helper.RestPass, Hostname: ipv4, Port: helper.RestPort},
}

return node.LoadSample(opts.Conf.SampleBucket)
}
33 changes: 33 additions & 0 deletions daemon/restserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,38 @@ func HttpAddBucket(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}

type AddSampleBucketJSON struct {
SampleBucket string `json:"sample_bucket"`
UseHostname bool `json:"use_hostname"`
}

func HttpAddSampleBucket(w http.ResponseWriter, r *http.Request) {
reqCtx, err := getHttpContext(r)
if err != nil {
writeJSONError(w, err)
return
}

clusterID := mux.Vars(r)["cluster_id"]

var reqData AddSampleBucketJSON
err = readJsonRequest(r, &reqData)
if err != nil {
writeJSONError(w, err)
return
}

err = addSampleBucket(reqCtx, clusterID, AddSampleOptions{
Conf: reqData,
})
if err != nil {
writeJSONError(w, err)
return
}

w.WriteHeader(200)
}

type AddCollectionJSON struct {
Name string `json:"name"`
ScopeName string `json:"scope_name"`
Expand Down Expand Up @@ -619,6 +651,7 @@ func createRESTRouter() *mux.Router {
r.HandleFunc("/cluster/{cluster_id}/setup", HttpSetupCluster).Methods("POST")
r.HandleFunc("/cluster/{cluster_id}", HttpDeleteCluster).Methods("DELETE")
r.HandleFunc("/cluster/{cluster_id}/add-bucket", HttpAddBucket).Methods("POST")
r.HandleFunc("/cluster/{cluster_id}/add-sample-bucket", HttpAddSampleBucket).Methods("POST")
r.HandleFunc("/cluster/{cluster_id}/add-collection", HttpAddCollection).Methods("POST")
r.HandleFunc("/cluster/{cluster_id}/setup-cert-auth", HttpSetupClientCertAuth).Methods("POST")
r.HandleFunc("/images", HttpBuildImage).Methods("POST")
Expand Down

0 comments on commit 7c5b16e

Please sign in to comment.