diff --git a/engine.go b/engine.go index 4321979..1960d9b 100644 --- a/engine.go +++ b/engine.go @@ -22,17 +22,13 @@ import ( "net/http" "strings" "sync" - "sync/atomic" ) var ( - ErrEmptyParam = errors.New("error empty param") - ErrPickNote = errors.New("error pick node") - ErrReachedRetryTimes = errors.New("error reached retry times") + ErrEmptyParam = errors.New("error empty param") + ErrPickNote = errors.New("error pick node") ) -const _internalFlag = "?type=internal" - type Engine struct { mu sync.RWMutex options *Options @@ -81,38 +77,30 @@ func (e *Engine) Set(w http.ResponseWriter, r *http.Request) { return } - // set locally - e.cache.Set(key, ByteView{ - B: []byte(value), - }) + e.mu.RLock() + pickedNodeAddr := e.nodes.Get(key) + e.mu.RUnlock() + slog.Info("EZCache: pick node", "addr", pickedNodeAddr) - // drop if from internal node - if r.URL.Query().Get("type") == "internal" { + if pickedNodeAddr == "" { + http.Error(w, ErrPickNote.Error(), http.StatusInternalServerError) return } - // set distantly - e.mu.RLock() - defer e.mu.RUnlock() - for _, addr := range e.addrs { - // skip node itself - if strings.Contains(addr, e.options.Addr) { - continue - } + if strings.Contains(pickedNodeAddr, e.options.Addr) { + e.cache.Set(key, ByteView{ + B: []byte(value), + }) + slog.Info("EZCache: set", "key", key, "value", value) + return + } - url := fmt.Sprintf("%v%v/%v/%v%v", addr, e.options.BasePath, key, value, _internalFlag) - req, err := http.NewRequest(http.MethodPost, url, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } - _, err = http.DefaultClient.Do(req) - slog.Info("EZCache: node set distantly", "from", e.options.Addr, "to", addr) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } + url := fmt.Sprintf("%v%v/%v/%v", pickedNodeAddr, e.options.BasePath, key, value) + if err := DoHTTPRequest(http.MethodPost, url, nil); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + slog.Info("EZCache: node redirect set request", "from", e.options.Addr, "to", pickedNodeAddr) } func (e *Engine) Get(w http.ResponseWriter, r *http.Request) { @@ -122,70 +110,29 @@ func (e *Engine) Get(w http.ResponseWriter, r *http.Request) { return } - // get locally - if value, ok := e.cache.Get(key); ok { - _, _ = w.Write(value.ByteSlice()) - return - } + e.mu.RLock() + pickedNodeAddr := e.nodes.Get(key) + e.mu.RUnlock() + slog.Info("EZCache: pick node", "addr", pickedNodeAddr) - // drop if from internal node - if r.URL.Query().Get("type") == "internal" { + if pickedNodeAddr == "" { + http.Error(w, ErrPickNote.Error(), http.StatusInternalServerError) return } - // get distantly - var times uint32 = 0 - for { - atomic.AddUint32(×, 1) - // reached max retry times - if int(atomic.LoadUint32(×)) == e.options.RetryTimes { - http.Error(w, ErrReachedRetryTimes.Error(), http.StatusInternalServerError) - return - } - - e.mu.RLock() - pickedNodeAddr := e.nodes.Get(key) - e.mu.RUnlock() - - if pickedNodeAddr == "" { - http.Error(w, ErrPickNote.Error(), http.StatusInternalServerError) + if strings.Contains(pickedNodeAddr, e.options.Addr) { + if v, ok := e.cache.Get(key); ok { + _, _ = w.Write(v.ByteSlice()) + slog.Info("EZCache: get", "key", key, "value", v.String()) return } - if strings.Contains(pickedNodeAddr, e.options.Addr) { - return - } - - url := fmt.Sprintf("%v%v/%v%v", pickedNodeAddr, e.options.BasePath, key, _internalFlag) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } - resp, err := http.DefaultClient.Do(req) - slog.Info("EZCache: node get distantly", "from", e.options.Addr, "to", pickedNodeAddr) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } - - if resp.StatusCode != http.StatusOK { - _ = resp.Body.Close() - continue - } - body, err := io.ReadAll(resp.Body) - _ = resp.Body.Close() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } - // populate cache - e.cache.Set(key, ByteView{ - B: CopyBytes(body), - }) - - _, _ = w.Write(body) + slog.Info("EZCache: key does not exist", "key", key) return } + + url := fmt.Sprintf("%v%v/%v", pickedNodeAddr, e.options.BasePath, key) + slog.Info("EZCache: node redirect get request", "from", e.options.Addr, "to", pickedNodeAddr) + http.Redirect(w, r, url, http.StatusFound) } func (e *Engine) Delete(w http.ResponseWriter, r *http.Request) { @@ -195,34 +142,38 @@ func (e *Engine) Delete(w http.ResponseWriter, r *http.Request) { return } - // delete locally - e.cache.Delete(key) + e.mu.RLock() + pickedNodeAddr := e.nodes.Get(key) + e.mu.RUnlock() + slog.Info("EZCache: pick node", "addr", pickedNodeAddr) - // drop if from internal node - if r.URL.Query().Get("type") == "internal" { + if pickedNodeAddr == "" { + http.Error(w, ErrPickNote.Error(), http.StatusInternalServerError) return } - // delete distantly - e.mu.RLock() - defer e.mu.RUnlock() - for _, addr := range e.addrs { - // skip node itself - if strings.Contains(addr, e.options.Addr) { - continue - } - url := fmt.Sprintf("%v%v/%v%v", addr, e.options.BasePath, key, _internalFlag) - req, err := http.NewRequest(http.MethodDelete, url, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } - _, err = http.DefaultClient.Do(req) - slog.Info("EZCache: node delete distantly", "from", e.options.Addr, "to", addr) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - continue - } + if strings.Contains(pickedNodeAddr, e.options.Addr) { + e.cache.Delete(key) + slog.Info("EZCache: delete", "key", key) + return + } + + url := fmt.Sprintf("%v%v/%v", pickedNodeAddr, e.options.BasePath, key) + if err := DoHTTPRequest(http.MethodDelete, url, nil); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + slog.Info("EZCache: node redirect delete request", "from", e.options.Addr, "to", pickedNodeAddr) +} +func DoHTTPRequest(method, url string, body io.Reader) error { + req, err := http.NewRequest(method, url, body) + if err != nil { + return err + } + _, err = http.DefaultClient.Do(req) + if err != nil { + return err + } + return nil } diff --git a/engine_test.go b/engine_test.go deleted file mode 100644 index 4e3cdaa..0000000 --- a/engine_test.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 justlorain -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ezcache - -import ( - "fmt" - "testing" -) - -func TestRun(t *testing.T) { - addrs := []string{"http://localhost:7246", "http://localhost:7247", "http://localhost:7248"} - e := NewEngine(WithAddr(addrs[0])) - e.RegisterNodes(addrs...) - if err := e.Run(); err != nil { - fmt.Println(err) - } -} diff --git a/example/main.go b/example/main.go index 749e4dc..137f868 100644 --- a/example/main.go +++ b/example/main.go @@ -1,3 +1,17 @@ +// Copyright 2024 justlorain +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package main import (