Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSharpe committed Apr 12, 2024
1 parent f1e114c commit e805593
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 177 deletions.
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/RedisLabs/rediscloud-go-api/service/account"
"github.com/RedisLabs/rediscloud-go-api/service/cloud_accounts"
"github.com/RedisLabs/rediscloud-go-api/service/databases"
"github.com/RedisLabs/rediscloud-go-api/service/latest_backups"
"github.com/RedisLabs/rediscloud-go-api/service/regions"
"github.com/RedisLabs/rediscloud-go-api/service/subscriptions"
)
Expand All @@ -28,6 +29,7 @@ type Client struct {
Database *databases.API
Subscription *subscriptions.API
Regions *regions.API
LatestBackup *latest_backups.API
// acl
RedisRules *redis_rules.API
Roles *roles.API
Expand Down Expand Up @@ -65,6 +67,7 @@ func NewClient(configs ...Option) (*Client, error) {
Database: databases.NewAPI(client, t, config.logger),
Subscription: subscriptions.NewAPI(client, t, config.logger),
Regions: regions.NewAPI(client, t, config.logger),
LatestBackup: latest_backups.NewAPI(client, t, config.logger),
// acl
RedisRules: redis_rules.NewAPI(client, t, config.logger),
Roles: roles.NewAPI(client, t, config.logger),
Expand Down
65 changes: 65 additions & 0 deletions internal/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package internal

import (
"encoding/json"
"fmt"
"regexp"

"github.com/RedisLabs/rediscloud-go-api/redis"
)

type Task struct {
CommandType *string `json:"commandType,omitempty"`
Description *string `json:"description,omitempty"`
Status *string `json:"status,omitempty"`
ID *string `json:"taskId,omitempty"`
Response *response `json:"response,omitempty"`
}

func (o Task) String() string {
return ToString(o)
}

type response struct {
ID *int `json:"resourceId,omitempty"`
Resource *json.RawMessage `json:"resource,omitempty"`
Error *Error `json:"error,omitempty"`
}

func (o response) String() string {
return ToString(o)
}

type Error struct {
Type *string `json:"type,omitempty"`
Description *string `json:"description,omitempty"`
Status *string `json:"status,omitempty"`
}

func (o Error) String() string {
return ToString(o)
}

func (e *Error) StatusCode() string {
matches := errorStatusCode.FindStringSubmatch(redis.StringValue(e.Status))
if len(matches) == 2 {
return matches[1]
}
return ""
}

func (e *Error) Error() string {
return fmt.Sprintf("%s - %s: %s", redis.StringValue(e.Status), redis.StringValue(e.Type), redis.StringValue(e.Description))
}

var errorStatusCode = regexp.MustCompile("^(\\d*).*$")
var _ error = &Error{}

// TaskResponse is the high-level response when a Create/Update/Delete operation is in progress.
type TaskResponse struct {
ID *string `json:"taskId,omitempty"`
}

func (o TaskResponse) String() string {
return ToString(o)
}
44 changes: 44 additions & 0 deletions internal/model_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package internal

import (
"testing"

"github.com/RedisLabs/rediscloud-go-api/redis"
)

func TestError_StatusCode(t *testing.T) {
tests := []struct {
name string
subject *Error
want string
}{
{
name: "no status code",
subject: &Error{
Status: redis.String("doesn't start with a number"),
},
want: "",
},
{
name: "starts with a status code",
subject: &Error{
Status: redis.String("418 I'm a teapot"),
},
want: "418",
},
{
name: "includes a number but doesn't start with it",
subject: &Error{
Status: redis.String("The number 42 should not be found"),
},
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.subject.StatusCode(); got != tt.want {
t.Errorf("StatusCode() = %v, want %v", got, tt.want)
}
})
}
}
132 changes: 89 additions & 43 deletions internal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@ type Log interface {
}

type Api interface {
// WaitForResourceId will poll the task, waiting for the task to finish processing, where it will then return.
// An error will be returned if the task couldn't be retrieved or the task was not processed successfully.
// WaitForResourceId will poll the Task, waiting for the Task to finish processing, where it will then return.
// An error will be returned if the Task couldn't be retrieved or the Task was not processed successfully.
//
// The task will be continuously polled until the task either fails or succeeds - cancellation can be achieved
// The Task will be continuously polled until the Task either fails or succeeds - cancellation can be achieved
// by cancelling the context.
WaitForResourceId(ctx context.Context, id string) (int, error)

// Wait will poll the task, waiting for the task to finish processing, where it will then return.
// An error will be returned if the task couldn't be retrieved or the task was not processed successfully.
// Wait will poll the Task, waiting for the Task to finish processing, where it will then return.
// An error will be returned if the Task couldn't be retrieved or the Task was not processed successfully.
//
// The task will be continuously polled until the task either fails or succeeds - cancellation can be achieved
// The Task will be continuously polled until the Task either fails or succeeds - cancellation can be achieved
// by cancelling the context.
Wait(ctx context.Context, id string) error

// WaitForResource will poll the task, waiting for the task to finish processing, where it will then marshal the
// WaitForResource will poll the Task, waiting for the Task to finish processing, where it will then marshal the
// returned resource into the value pointed to be `resource`.
//
// The task will be continuously polled until the task either fails or succeeds - cancellation can be achieved
// The Task will be continuously polled until the Task either fails or succeeds - cancellation can be achieved
// by cancelling the context.
WaitForResource(ctx context.Context, id string, resource interface{}) error

// WaitForTask will poll the Task, waiting for it to enter a terminal state (i.e Done or Error). This task
// will then be returned, or an error in case it cannot be retrieved.
WaitForTask(ctx context.Context, id string) (*Task, error)
}

type api struct {
Expand All @@ -59,11 +63,7 @@ func (a *api) WaitForResourceId(ctx context.Context, id string) (int, error) {

func (a *api) Wait(ctx context.Context, id string) error {
_, err := a.waitForTaskToComplete(ctx, id)
if err != nil {
return err
}

return nil
return err
}

func (a *api) WaitForResource(ctx context.Context, id string, resource interface{}) error {
Expand All @@ -72,39 +72,85 @@ func (a *api) WaitForResource(ctx context.Context, id string, resource interface
return err
}

err = json.Unmarshal(*task.Response.Resource, resource)
return json.Unmarshal(*task.Response.Resource, resource)
}

func (a *api) waitForTaskToComplete(ctx context.Context, id string) (*Task, error) {
var task *Task
notFoundCount := 0
err := retry.Do(
func() error {
var err error
task, err = a.get(ctx, id)
if err != nil {
if status, ok := err.(*HTTPError); ok && status.StatusCode == 404 {
return &taskNotFoundError{err}
}
return retry.Unrecoverable(err)
}

status := redis.StringValue(task.Status)
if status == processedState {
return nil
}

if _, ok := processingStates[status]; !ok {
return retry.Unrecoverable(fmt.Errorf("task %s failed %s - %s", id, status, redis.StringValue(task.Description)))
}

return fmt.Errorf("task %s not processed yet: %s", id, status)
},
retry.Attempts(math.MaxUint16),
retry.Delay(1*time.Second),
retry.MaxDelay(30*time.Second),
retry.RetryIf(func(err error) bool {
if !retry.IsRecoverable(err) {
return false
}
if _, ok := err.(*taskNotFoundError); ok {
notFoundCount++
if notFoundCount > max404Errors {
return false
}
}
return true
}),
retry.LastErrorOnly(true), retry.Context(ctx), retry.OnRetry(func(_ uint, err error) {
a.logger.Println(err)
}))
if err != nil {
return err
return nil, err
}

return nil
return task, nil
}

func (a *api) waitForTaskToComplete(ctx context.Context, id string) (*task, error) {
var task *task
func (a *api) WaitForTask(ctx context.Context, id string) (*Task, error) {
var task *Task
notFoundCount := 0
err := retry.Do(func() error {
var err error
task, err = a.get(ctx, id)
if err != nil {
if status, ok := err.(*HTTPError); ok && status.StatusCode == 404 {
return &taskNotFoundError{err}
err := retry.Do(
func() error {
var err error
task, err = a.get(ctx, id)
if err != nil {
if status, ok := err.(*HTTPError); ok && status.StatusCode == 404 {
return &taskNotFoundError{err}
}
return retry.Unrecoverable(err)
}
return retry.Unrecoverable(err)
}

status := redis.StringValue(task.Status)
if status == processedState {
return nil
}
status := redis.StringValue(task.Status)

if _, ok := processingStates[status]; !ok {
return retry.Unrecoverable(fmt.Errorf("task %s failed %s - %s", id, status, redis.StringValue(task.Description)))
}
if _, ok := processingStates[status]; !ok {
// The task is no longer processing for whatever reason
return nil
}

return fmt.Errorf("task %s not processed yet: %s", id, status)
},
retry.Attempts(math.MaxUint16), retry.Delay(1*time.Second), retry.MaxDelay(30*time.Second),
return fmt.Errorf("task %s not processed yet: %s", id, status)
},
retry.Attempts(math.MaxUint16),
retry.Delay(1*time.Second),
retry.MaxDelay(30*time.Second),
retry.RetryIf(func(err error) bool {
if !retry.IsRecoverable(err) {
return false
Expand All @@ -127,9 +173,9 @@ func (a *api) waitForTaskToComplete(ctx context.Context, id string) (*task, erro
return task, nil
}

func (a *api) get(ctx context.Context, id string) (*task, error) {
var task task
if err := a.client.Get(ctx, fmt.Sprintf("retrieve task %s", id), "/tasks/"+url.PathEscape(id), &task); err != nil {
func (a *api) get(ctx context.Context, id string) (*Task, error) {
var task Task
if err := a.client.Get(ctx, fmt.Sprintf("retrieve Task %s", id), "/tasks/"+url.PathEscape(id), &task); err != nil {
return nil, err
}

Expand All @@ -140,11 +186,11 @@ func (a *api) get(ctx context.Context, id string) (*task, error) {
return &task, nil
}

// Number of 404 errors to swallow before returning an error while waiting for a task to finish.
// Number of 404 errors to swallow before returning an error while waiting for a Task to finish.
//
// There's a short window between the API returning a task ID and the task being known by the
// Task service, so by ignoring _a number_ of 404 errors we give the task service enough time to
// learn about the task but also handle the situation where there really is no task.
// There's a short window between the API returning a Task ID and the Task being known by the
// Task service, so by ignoring _a number_ of 404 errors we give the Task service enough time to
// learn about the Task but also handle the situation where there really is no Task.
const max404Errors = 5

var processingStates = map[string]bool{
Expand Down
18 changes: 9 additions & 9 deletions service/access_control_lists/redis_rules/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ type HttpClient interface {
Delete(ctx context.Context, name, path string, responseBody interface{}) error
}

type Task interface {
type TaskWaiter interface {
WaitForResourceId(ctx context.Context, id string) (int, error)
Wait(ctx context.Context, id string) error
}

type API struct {
client HttpClient
task Task
logger Log
client HttpClient
taskWaiter TaskWaiter
logger Log
}

func NewAPI(client HttpClient, task Task, logger Log) *API {
return &API{client: client, task: task, logger: logger}
func NewAPI(client HttpClient, taskWaiter TaskWaiter, logger Log) *API {
return &API{client: client, taskWaiter: taskWaiter, logger: logger}
}

// List will list all of the current account's redisRules.
Expand Down Expand Up @@ -71,7 +71,7 @@ func (a *API) Create(ctx context.Context, redisRule CreateRedisRuleRequest) (int

a.logger.Printf("Waiting for task %s to finish creating the redisRule", task)

id, err := a.task.WaitForResourceId(ctx, *task.ID)
id, err := a.taskWaiter.WaitForResourceId(ctx, *task.ID)
if err != nil {
return 0, fmt.Errorf("failed when creating redisRule %d: %w", id, err)
}
Expand All @@ -89,7 +89,7 @@ func (a *API) Update(ctx context.Context, id int, redisRule CreateRedisRuleReque

a.logger.Printf("Waiting for task %s to finish updating the redisRule", task)

err = a.task.Wait(ctx, *task.ID)
err = a.taskWaiter.Wait(ctx, *task.ID)
if err != nil {
return fmt.Errorf("failed when updating redisRule %d: %w", id, err)
}
Expand All @@ -107,7 +107,7 @@ func (a *API) Delete(ctx context.Context, id int) error {

a.logger.Printf("Waiting for redisRule %d to finish being deleted", id)

err = a.task.Wait(ctx, *task.ID)
err = a.taskWaiter.Wait(ctx, *task.ID)
if err != nil {
return fmt.Errorf("failed when deleting redisRule %d: %w", id, err)
}
Expand Down
Loading

0 comments on commit e805593

Please sign in to comment.